432 lines
11 KiB
Go
432 lines
11 KiB
Go
|
package service
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"fmt"
|
|||
|
"sort"
|
|||
|
"time"
|
|||
|
|
|||
|
"go-common/app/admin/main/aegis/model"
|
|||
|
"go-common/app/admin/main/aegis/model/net"
|
|||
|
"go-common/library/ecode"
|
|||
|
"go-common/library/log"
|
|||
|
|
|||
|
"github.com/jinzhu/gorm"
|
|||
|
)
|
|||
|
|
|||
|
/**
|
|||
|
* 业务下所有需要分发任务的flow,用net+flow中文名拼接
|
|||
|
*/
|
|||
|
func (s *Service) dispatchFlow(c context.Context, businessID []int64, limitFlow []int64) (res map[int64]map[int64]string, err error) {
|
|||
|
var (
|
|||
|
nets []*net.Net
|
|||
|
tranID []int64
|
|||
|
dirs []*net.Direction
|
|||
|
flows []*net.Flow
|
|||
|
)
|
|||
|
|
|||
|
//biz:flow:flow_name
|
|||
|
res = map[int64]map[int64]string{}
|
|||
|
//业务下所有可用网
|
|||
|
if nets, err = s.gorm.NetsByBusiness(c, businessID, false); err != nil {
|
|||
|
log.Error("dispatchFlow s.gorm.NetsByBusiness(%v) error(%v)", businessID, err)
|
|||
|
return
|
|||
|
}
|
|||
|
netID := []int64{}
|
|||
|
netMap := map[int64]*net.Net{}
|
|||
|
for _, item := range nets {
|
|||
|
netID = append(netID, item.ID)
|
|||
|
netMap[item.ID] = item
|
|||
|
}
|
|||
|
//网下所有可用变迁
|
|||
|
if tranID, err = s.tranIDByNet(c, netID, true, false); err != nil {
|
|||
|
log.Error("dispatchFlow s.gorm.TransitionIDByNet(%v) error(%v) businessid(%d)", netID, err, businessID)
|
|||
|
return
|
|||
|
}
|
|||
|
if len(tranID) == 0 {
|
|||
|
return
|
|||
|
}
|
|||
|
//变迁所有可用的被指向的有向线
|
|||
|
if dirs, err = s.gorm.DirectionByTransitionID(c, tranID, net.DirInput, false); err != nil {
|
|||
|
log.Error("dispatchFlow s.gorm.DirectionByTransitionID error(%v) businessid(%d)", err, businessID)
|
|||
|
return
|
|||
|
}
|
|||
|
limitFlowMap := map[int64]int{}
|
|||
|
for _, item := range limitFlow {
|
|||
|
limitFlowMap[item] = 1
|
|||
|
}
|
|||
|
accessFlow := []int64{}
|
|||
|
for _, item := range dirs {
|
|||
|
if len(limitFlowMap) > 0 && limitFlowMap[item.FlowID] <= 0 {
|
|||
|
continue
|
|||
|
}
|
|||
|
accessFlow = append(accessFlow, item.FlowID)
|
|||
|
}
|
|||
|
if len(accessFlow) == 0 {
|
|||
|
return
|
|||
|
}
|
|||
|
//拼接每个节点的中文名
|
|||
|
if flows, err = s.flows(c, accessFlow, false); err != nil {
|
|||
|
log.Error("dispatchFlow s.flows error(%v) businessid(%d)", err, businessID)
|
|||
|
return
|
|||
|
}
|
|||
|
sort.Sort(net.FlowArr(flows))
|
|||
|
for _, item := range flows {
|
|||
|
if item == nil || netMap[item.NetID] == nil {
|
|||
|
continue
|
|||
|
}
|
|||
|
|
|||
|
nt := netMap[item.NetID]
|
|||
|
if _, exist := res[nt.BusinessID]; !exist {
|
|||
|
res[nt.BusinessID] = map[int64]string{}
|
|||
|
}
|
|||
|
res[nt.BusinessID][item.ID] = nt.ChName + item.ChName
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//ShowFlow .
|
|||
|
func (s *Service) ShowFlow(c context.Context, id int64) (r *net.ShowFlowResult, err error) {
|
|||
|
var (
|
|||
|
f *net.Flow
|
|||
|
details map[int64][]*net.TokenBind
|
|||
|
n *net.Net
|
|||
|
)
|
|||
|
|
|||
|
if f, err = s.gorm.FlowByID(c, id); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if details, err = s.gorm.TokenBindByElement(c, []int64{id}, []int8{net.BindTypeFlow}, true); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if n, err = s.gorm.NetByID(c, f.NetID); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
r = &net.ShowFlowResult{
|
|||
|
Flow: f,
|
|||
|
Tokens: details[id],
|
|||
|
IsStart: n.StartFlowID == id,
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//GetFlowList .
|
|||
|
func (s *Service) GetFlowList(c context.Context, pm *net.ListNetElementParam) (result *net.ListFlowRes, err error) {
|
|||
|
var (
|
|||
|
flowID []int64
|
|||
|
tks map[int64][]*net.TokenBind
|
|||
|
n *net.Net
|
|||
|
uid = []int64{}
|
|||
|
unames map[int64]string
|
|||
|
)
|
|||
|
|
|||
|
if result, err = s.gorm.FlowList(c, pm); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if len(result.Result) == 0 {
|
|||
|
return
|
|||
|
}
|
|||
|
for _, item := range result.Result {
|
|||
|
flowID = append(flowID, item.ID)
|
|||
|
uid = append(uid, item.UID)
|
|||
|
}
|
|||
|
if tks, err = s.gorm.TokenBindByElement(c, flowID, []int8{net.BindTypeFlow}, true); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if n, err = s.gorm.NetByID(c, pm.NetID); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if unames, err = s.http.GetUnames(c, uid); err != nil {
|
|||
|
log.Error("GetFlowList s.http.GetUnames error(%v)", err)
|
|||
|
err = nil
|
|||
|
}
|
|||
|
for _, item := range result.Result {
|
|||
|
item.IsStart = item.ID == n.StartFlowID
|
|||
|
item.Username = unames[item.UID]
|
|||
|
for _, bd := range tks[item.ID] {
|
|||
|
item.Tokens = append(item.Tokens, bd.ChName)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//GetFlowByNet .
|
|||
|
func (s *Service) GetFlowByNet(c context.Context, netID int64) (result map[int64]string, err error) {
|
|||
|
var (
|
|||
|
flows []*net.Flow
|
|||
|
)
|
|||
|
result = map[int64]string{}
|
|||
|
if flows, err = s.gorm.FlowsByNet(c, []int64{netID}); err != nil {
|
|||
|
log.Error("GetFlowByNet s.gorm.FlowsByNet(%d) error(%v)", netID, err)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
for _, item := range flows {
|
|||
|
result[item.ID] = item.ChName
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func (s *Service) checkFlowUnique(c context.Context, netID int64, name string) (err error, msg string) {
|
|||
|
var exist *net.Flow
|
|||
|
if exist, err = s.gorm.FlowByUnique(c, netID, name); err != nil {
|
|||
|
log.Error("checkFlowUnique s.gorm.FlowByUnique(%d,%s) error(%v)", netID, name, err)
|
|||
|
return
|
|||
|
}
|
|||
|
if exist != nil {
|
|||
|
err = ecode.AegisUniqueAlreadyExist
|
|||
|
msg = fmt.Sprintf(ecode.AegisUniqueAlreadyExist.Message(), "节点", name)
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func (s *Service) checkStartFlowBind(oldFlow *net.Flow, tokenIDList []int64) (err error, msg string) {
|
|||
|
if oldFlow != nil && !oldFlow.IsAvailable() {
|
|||
|
err = ecode.AegisFlowDisabled
|
|||
|
msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowDisabled.Message())
|
|||
|
return
|
|||
|
}
|
|||
|
//第一版动态审核初始接入状态:敏感待审、非敏感待审、高频转发待审,非一个确定性值,而系统不提供条件判断和guard解析,由
|
|||
|
//配置初始节点没有令牌,而业务start时自动传入state支持(后续接入统一初始状态进而条件分状态的逻辑后,去掉state字段,且加上该判断)
|
|||
|
//if len(tokenIDList) == 0 {
|
|||
|
// err = ecode.AegisFlowNoToken
|
|||
|
// msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowNoToken.Message())
|
|||
|
//}
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//AddFlow .
|
|||
|
func (s *Service) AddFlow(c context.Context, uid int64, f *net.FlowEditParam) (id int64, err error, msg string) {
|
|||
|
var (
|
|||
|
tx *gorm.DB
|
|||
|
diff = []string{}
|
|||
|
diffBind string
|
|||
|
)
|
|||
|
if err, msg = s.checkFlowUnique(c, f.NetID, f.Name); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if f.IsStart {
|
|||
|
if err, msg = s.checkStartFlowBind(nil, f.TokenIDList); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, f.IsStart, false, false))
|
|||
|
}
|
|||
|
flow := &net.Flow{
|
|||
|
NetID: f.NetID,
|
|||
|
Name: f.Name,
|
|||
|
ChName: f.ChName,
|
|||
|
Description: f.Description,
|
|||
|
UID: uid,
|
|||
|
}
|
|||
|
|
|||
|
//db update
|
|||
|
tx, err = s.gorm.BeginTx(c)
|
|||
|
if err != nil {
|
|||
|
log.Error("AddFlow s.gorm.BeginTx error(%v)", err)
|
|||
|
return
|
|||
|
}
|
|||
|
if err = s.gorm.AddItem(c, tx, flow); err != nil {
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
if diffBind, _, err, msg = s.compareFlowBind(c, tx, flow.ID, f.TokenIDList, false); err != nil {
|
|||
|
log.Error("AddFlow s.compareFlowBind error(%v) params(%+v)", err, f)
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
if diffBind != "" {
|
|||
|
diff = append(diff, diffBind)
|
|||
|
}
|
|||
|
if f.IsStart {
|
|||
|
if err = s.gorm.NetBindStartFlow(c, tx, flow.NetID, flow.ID); err != nil {
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
if err = tx.Commit().Error; err != nil {
|
|||
|
log.Error("AddFlow tx.Commit error(%v)", err)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
id = flow.ID
|
|||
|
|
|||
|
//日志
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, "", false))
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, "", false))
|
|||
|
oper := &model.NetConfOper{
|
|||
|
OID: flow.ID,
|
|||
|
Action: model.LogNetActionNew,
|
|||
|
UID: flow.UID,
|
|||
|
NetID: flow.NetID,
|
|||
|
ChName: flow.ChName,
|
|||
|
FlowID: flow.ID,
|
|||
|
Diff: diff,
|
|||
|
}
|
|||
|
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// UpdateFlow .
|
|||
|
func (s *Service) UpdateFlow(c context.Context, uid int64, f *net.FlowEditParam) (err error, msg string) {
|
|||
|
var (
|
|||
|
old *net.Flow
|
|||
|
n *net.Net
|
|||
|
startFlowID int64 = -1
|
|||
|
updates = map[string]interface{}{}
|
|||
|
tx *gorm.DB
|
|||
|
diff = []string{}
|
|||
|
diffBind string
|
|||
|
changedBind []int64
|
|||
|
)
|
|||
|
if old, err = s.gorm.FlowByID(c, f.ID); err != nil {
|
|||
|
log.Error("UpdateFlow s.gorm.FlowByID(%d) error(%v)", f.ID, err)
|
|||
|
return
|
|||
|
}
|
|||
|
if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
|
|||
|
log.Error("UpdateFlow s.gorm.NetByID(%d) error(%v) flowid(%d)", old.NetID, err, f.ID)
|
|||
|
return
|
|||
|
}
|
|||
|
if f.IsStart && n.StartFlowID != f.ID {
|
|||
|
startFlowID = f.ID
|
|||
|
} else if !f.IsStart && n.StartFlowID == f.ID {
|
|||
|
startFlowID = 0
|
|||
|
}
|
|||
|
|
|||
|
if f.IsStart {
|
|||
|
if err, msg = s.checkStartFlowBind(old, f.TokenIDList); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, true, false, true))
|
|||
|
}
|
|||
|
|
|||
|
if f.Name != old.Name {
|
|||
|
if err, msg = s.checkFlowUnique(c, old.NetID, f.Name); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, old.Name, true))
|
|||
|
old.Name = f.Name
|
|||
|
updates["name"] = f.Name
|
|||
|
}
|
|||
|
if f.ChName != old.ChName {
|
|||
|
diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, old.ChName, true))
|
|||
|
old.ChName = f.ChName
|
|||
|
updates["ch_name"] = f.ChName
|
|||
|
}
|
|||
|
if f.Description != old.Description {
|
|||
|
old.Description = f.Description
|
|||
|
updates["description"] = f.Description
|
|||
|
}
|
|||
|
|
|||
|
//db update
|
|||
|
tx, err = s.gorm.BeginTx(c)
|
|||
|
if err != nil {
|
|||
|
log.Error("UpdateFlow s.gorm.BeginTx error(%v)", err)
|
|||
|
return
|
|||
|
}
|
|||
|
if len(updates) > 0 {
|
|||
|
if err = s.gorm.UpdateFields(c, tx, net.TableFlow, old.ID, updates); err != nil {
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
if startFlowID >= 0 {
|
|||
|
if err = s.gorm.NetBindStartFlow(c, tx, n.ID, startFlowID); err != nil {
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
if diffBind, changedBind, err, msg = s.compareFlowBind(c, tx, f.ID, f.TokenIDList, true); err != nil {
|
|||
|
log.Error("updateFlow s.compareFlowBind error(%v) params(%+v)", err, f)
|
|||
|
tx.Rollback()
|
|||
|
return
|
|||
|
}
|
|||
|
if diffBind != "" {
|
|||
|
diff = append(diff, diffBind)
|
|||
|
}
|
|||
|
if err = tx.Commit().Error; err != nil {
|
|||
|
log.Error("UpdateFlow tx.Commit error(%v)", err)
|
|||
|
return
|
|||
|
}
|
|||
|
s.delFlowCache(c, old, changedBind)
|
|||
|
|
|||
|
//日志
|
|||
|
if len(diff) == 0 {
|
|||
|
return
|
|||
|
}
|
|||
|
oper := &model.NetConfOper{
|
|||
|
OID: old.ID,
|
|||
|
Action: model.LogNetActionUpdate,
|
|||
|
UID: uid,
|
|||
|
NetID: old.NetID,
|
|||
|
ChName: old.ChName,
|
|||
|
FlowID: old.ID,
|
|||
|
Diff: diff,
|
|||
|
}
|
|||
|
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//SwitchFlow .
|
|||
|
func (s *Service) SwitchFlow(c context.Context, id int64, needDisable bool) (err error) {
|
|||
|
var (
|
|||
|
old *net.Flow
|
|||
|
n *net.Net
|
|||
|
dirs []*net.Direction
|
|||
|
action string
|
|||
|
)
|
|||
|
if old, err = s.gorm.FlowByID(c, id); err != nil {
|
|||
|
log.Error("SwitchFlow s.gorm.FlowByID(%d) error(%v) needDisable(%v)", id, err, needDisable)
|
|||
|
return
|
|||
|
}
|
|||
|
log.Info("SwitchFlow id(%d) needdisable(%v) old-flow(%+v)", id, needDisable, old)
|
|||
|
available := old.IsAvailable()
|
|||
|
if available == !needDisable {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if needDisable {
|
|||
|
if dirs, err = s.gorm.DirectionByFlowID(c, []int64{id}, 0); err != nil {
|
|||
|
log.Error("SwitchFlow s.gorm.DirectionByFlowID(%d) error(%v)", id, err)
|
|||
|
return
|
|||
|
}
|
|||
|
if len(dirs) > 0 {
|
|||
|
log.Error("SwitchFlow dir by flow(%d) founded", id)
|
|||
|
err = ecode.AegisFlowBinded
|
|||
|
return
|
|||
|
}
|
|||
|
if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
|
|||
|
log.Error("SwitchFlow s.gorm.NetByID(%d) error(%v) flow(%d)", old.NetID, err, id)
|
|||
|
return
|
|||
|
}
|
|||
|
if n.StartFlowID == id {
|
|||
|
log.Error("SwitchFlow net(%d).startflow=flow(%d) founded", n.ID, id)
|
|||
|
err = ecode.AegisFlowBinded
|
|||
|
return
|
|||
|
}
|
|||
|
old.DisableTime = time.Now()
|
|||
|
action = model.LogNetActionDisable
|
|||
|
} else {
|
|||
|
old.DisableTime = net.Recovered
|
|||
|
action = model.LogNetActionAvailable
|
|||
|
}
|
|||
|
|
|||
|
if err = s.gorm.UpdateFields(c, nil, net.TableFlow, id, map[string]interface{}{"disable_time": old.DisableTime}); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
s.delFlowCache(c, old, nil)
|
|||
|
|
|||
|
//日志
|
|||
|
oper := &model.NetConfOper{
|
|||
|
OID: old.ID,
|
|||
|
Action: action,
|
|||
|
UID: old.UID,
|
|||
|
NetID: old.NetID,
|
|||
|
ChName: old.ChName,
|
|||
|
FlowID: old.ID,
|
|||
|
}
|
|||
|
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
|||
|
return
|
|||
|
}
|