432 lines
11 KiB
Go
432 lines
11 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sort"
|
||
"time"
|
||
|
||
"go-common/app/admin/main/aegis/model"
|
||
"go-common/app/admin/main/aegis/model/net"
|
||
"go-common/library/ecode"
|
||
"go-common/library/log"
|
||
|
||
"github.com/jinzhu/gorm"
|
||
)
|
||
|
||
/**
|
||
* 业务下所有需要分发任务的flow,用net+flow中文名拼接
|
||
*/
|
||
func (s *Service) dispatchFlow(c context.Context, businessID []int64, limitFlow []int64) (res map[int64]map[int64]string, err error) {
|
||
var (
|
||
nets []*net.Net
|
||
tranID []int64
|
||
dirs []*net.Direction
|
||
flows []*net.Flow
|
||
)
|
||
|
||
//biz:flow:flow_name
|
||
res = map[int64]map[int64]string{}
|
||
//业务下所有可用网
|
||
if nets, err = s.gorm.NetsByBusiness(c, businessID, false); err != nil {
|
||
log.Error("dispatchFlow s.gorm.NetsByBusiness(%v) error(%v)", businessID, err)
|
||
return
|
||
}
|
||
netID := []int64{}
|
||
netMap := map[int64]*net.Net{}
|
||
for _, item := range nets {
|
||
netID = append(netID, item.ID)
|
||
netMap[item.ID] = item
|
||
}
|
||
//网下所有可用变迁
|
||
if tranID, err = s.tranIDByNet(c, netID, true, false); err != nil {
|
||
log.Error("dispatchFlow s.gorm.TransitionIDByNet(%v) error(%v) businessid(%d)", netID, err, businessID)
|
||
return
|
||
}
|
||
if len(tranID) == 0 {
|
||
return
|
||
}
|
||
//变迁所有可用的被指向的有向线
|
||
if dirs, err = s.gorm.DirectionByTransitionID(c, tranID, net.DirInput, false); err != nil {
|
||
log.Error("dispatchFlow s.gorm.DirectionByTransitionID error(%v) businessid(%d)", err, businessID)
|
||
return
|
||
}
|
||
limitFlowMap := map[int64]int{}
|
||
for _, item := range limitFlow {
|
||
limitFlowMap[item] = 1
|
||
}
|
||
accessFlow := []int64{}
|
||
for _, item := range dirs {
|
||
if len(limitFlowMap) > 0 && limitFlowMap[item.FlowID] <= 0 {
|
||
continue
|
||
}
|
||
accessFlow = append(accessFlow, item.FlowID)
|
||
}
|
||
if len(accessFlow) == 0 {
|
||
return
|
||
}
|
||
//拼接每个节点的中文名
|
||
if flows, err = s.flows(c, accessFlow, false); err != nil {
|
||
log.Error("dispatchFlow s.flows error(%v) businessid(%d)", err, businessID)
|
||
return
|
||
}
|
||
sort.Sort(net.FlowArr(flows))
|
||
for _, item := range flows {
|
||
if item == nil || netMap[item.NetID] == nil {
|
||
continue
|
||
}
|
||
|
||
nt := netMap[item.NetID]
|
||
if _, exist := res[nt.BusinessID]; !exist {
|
||
res[nt.BusinessID] = map[int64]string{}
|
||
}
|
||
res[nt.BusinessID][item.ID] = nt.ChName + item.ChName
|
||
}
|
||
return
|
||
}
|
||
|
||
//ShowFlow .
|
||
func (s *Service) ShowFlow(c context.Context, id int64) (r *net.ShowFlowResult, err error) {
|
||
var (
|
||
f *net.Flow
|
||
details map[int64][]*net.TokenBind
|
||
n *net.Net
|
||
)
|
||
|
||
if f, err = s.gorm.FlowByID(c, id); err != nil {
|
||
return
|
||
}
|
||
if details, err = s.gorm.TokenBindByElement(c, []int64{id}, []int8{net.BindTypeFlow}, true); err != nil {
|
||
return
|
||
}
|
||
if n, err = s.gorm.NetByID(c, f.NetID); err != nil {
|
||
return
|
||
}
|
||
r = &net.ShowFlowResult{
|
||
Flow: f,
|
||
Tokens: details[id],
|
||
IsStart: n.StartFlowID == id,
|
||
}
|
||
return
|
||
}
|
||
|
||
//GetFlowList .
|
||
func (s *Service) GetFlowList(c context.Context, pm *net.ListNetElementParam) (result *net.ListFlowRes, err error) {
|
||
var (
|
||
flowID []int64
|
||
tks map[int64][]*net.TokenBind
|
||
n *net.Net
|
||
uid = []int64{}
|
||
unames map[int64]string
|
||
)
|
||
|
||
if result, err = s.gorm.FlowList(c, pm); err != nil {
|
||
return
|
||
}
|
||
if len(result.Result) == 0 {
|
||
return
|
||
}
|
||
for _, item := range result.Result {
|
||
flowID = append(flowID, item.ID)
|
||
uid = append(uid, item.UID)
|
||
}
|
||
if tks, err = s.gorm.TokenBindByElement(c, flowID, []int8{net.BindTypeFlow}, true); err != nil {
|
||
return
|
||
}
|
||
if n, err = s.gorm.NetByID(c, pm.NetID); err != nil {
|
||
return
|
||
}
|
||
if unames, err = s.http.GetUnames(c, uid); err != nil {
|
||
log.Error("GetFlowList s.http.GetUnames error(%v)", err)
|
||
err = nil
|
||
}
|
||
for _, item := range result.Result {
|
||
item.IsStart = item.ID == n.StartFlowID
|
||
item.Username = unames[item.UID]
|
||
for _, bd := range tks[item.ID] {
|
||
item.Tokens = append(item.Tokens, bd.ChName)
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
//GetFlowByNet .
|
||
func (s *Service) GetFlowByNet(c context.Context, netID int64) (result map[int64]string, err error) {
|
||
var (
|
||
flows []*net.Flow
|
||
)
|
||
result = map[int64]string{}
|
||
if flows, err = s.gorm.FlowsByNet(c, []int64{netID}); err != nil {
|
||
log.Error("GetFlowByNet s.gorm.FlowsByNet(%d) error(%v)", netID, err)
|
||
return
|
||
}
|
||
|
||
for _, item := range flows {
|
||
result[item.ID] = item.ChName
|
||
}
|
||
return
|
||
}
|
||
|
||
func (s *Service) checkFlowUnique(c context.Context, netID int64, name string) (err error, msg string) {
|
||
var exist *net.Flow
|
||
if exist, err = s.gorm.FlowByUnique(c, netID, name); err != nil {
|
||
log.Error("checkFlowUnique s.gorm.FlowByUnique(%d,%s) error(%v)", netID, name, err)
|
||
return
|
||
}
|
||
if exist != nil {
|
||
err = ecode.AegisUniqueAlreadyExist
|
||
msg = fmt.Sprintf(ecode.AegisUniqueAlreadyExist.Message(), "节点", name)
|
||
}
|
||
return
|
||
}
|
||
|
||
func (s *Service) checkStartFlowBind(oldFlow *net.Flow, tokenIDList []int64) (err error, msg string) {
|
||
if oldFlow != nil && !oldFlow.IsAvailable() {
|
||
err = ecode.AegisFlowDisabled
|
||
msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowDisabled.Message())
|
||
return
|
||
}
|
||
//第一版动态审核初始接入状态:敏感待审、非敏感待审、高频转发待审,非一个确定性值,而系统不提供条件判断和guard解析,由
|
||
//配置初始节点没有令牌,而业务start时自动传入state支持(后续接入统一初始状态进而条件分状态的逻辑后,去掉state字段,且加上该判断)
|
||
//if len(tokenIDList) == 0 {
|
||
// err = ecode.AegisFlowNoToken
|
||
// msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowNoToken.Message())
|
||
//}
|
||
|
||
return
|
||
}
|
||
|
||
//AddFlow .
|
||
func (s *Service) AddFlow(c context.Context, uid int64, f *net.FlowEditParam) (id int64, err error, msg string) {
|
||
var (
|
||
tx *gorm.DB
|
||
diff = []string{}
|
||
diffBind string
|
||
)
|
||
if err, msg = s.checkFlowUnique(c, f.NetID, f.Name); err != nil {
|
||
return
|
||
}
|
||
if f.IsStart {
|
||
if err, msg = s.checkStartFlowBind(nil, f.TokenIDList); err != nil {
|
||
return
|
||
}
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, f.IsStart, false, false))
|
||
}
|
||
flow := &net.Flow{
|
||
NetID: f.NetID,
|
||
Name: f.Name,
|
||
ChName: f.ChName,
|
||
Description: f.Description,
|
||
UID: uid,
|
||
}
|
||
|
||
//db update
|
||
tx, err = s.gorm.BeginTx(c)
|
||
if err != nil {
|
||
log.Error("AddFlow s.gorm.BeginTx error(%v)", err)
|
||
return
|
||
}
|
||
if err = s.gorm.AddItem(c, tx, flow); err != nil {
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
if diffBind, _, err, msg = s.compareFlowBind(c, tx, flow.ID, f.TokenIDList, false); err != nil {
|
||
log.Error("AddFlow s.compareFlowBind error(%v) params(%+v)", err, f)
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
if diffBind != "" {
|
||
diff = append(diff, diffBind)
|
||
}
|
||
if f.IsStart {
|
||
if err = s.gorm.NetBindStartFlow(c, tx, flow.NetID, flow.ID); err != nil {
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
}
|
||
if err = tx.Commit().Error; err != nil {
|
||
log.Error("AddFlow tx.Commit error(%v)", err)
|
||
return
|
||
}
|
||
|
||
id = flow.ID
|
||
|
||
//日志
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, "", false))
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, "", false))
|
||
oper := &model.NetConfOper{
|
||
OID: flow.ID,
|
||
Action: model.LogNetActionNew,
|
||
UID: flow.UID,
|
||
NetID: flow.NetID,
|
||
ChName: flow.ChName,
|
||
FlowID: flow.ID,
|
||
Diff: diff,
|
||
}
|
||
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
||
return
|
||
}
|
||
|
||
// UpdateFlow .
|
||
func (s *Service) UpdateFlow(c context.Context, uid int64, f *net.FlowEditParam) (err error, msg string) {
|
||
var (
|
||
old *net.Flow
|
||
n *net.Net
|
||
startFlowID int64 = -1
|
||
updates = map[string]interface{}{}
|
||
tx *gorm.DB
|
||
diff = []string{}
|
||
diffBind string
|
||
changedBind []int64
|
||
)
|
||
if old, err = s.gorm.FlowByID(c, f.ID); err != nil {
|
||
log.Error("UpdateFlow s.gorm.FlowByID(%d) error(%v)", f.ID, err)
|
||
return
|
||
}
|
||
if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
|
||
log.Error("UpdateFlow s.gorm.NetByID(%d) error(%v) flowid(%d)", old.NetID, err, f.ID)
|
||
return
|
||
}
|
||
if f.IsStart && n.StartFlowID != f.ID {
|
||
startFlowID = f.ID
|
||
} else if !f.IsStart && n.StartFlowID == f.ID {
|
||
startFlowID = 0
|
||
}
|
||
|
||
if f.IsStart {
|
||
if err, msg = s.checkStartFlowBind(old, f.TokenIDList); err != nil {
|
||
return
|
||
}
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, true, false, true))
|
||
}
|
||
|
||
if f.Name != old.Name {
|
||
if err, msg = s.checkFlowUnique(c, old.NetID, f.Name); err != nil {
|
||
return
|
||
}
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, old.Name, true))
|
||
old.Name = f.Name
|
||
updates["name"] = f.Name
|
||
}
|
||
if f.ChName != old.ChName {
|
||
diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, old.ChName, true))
|
||
old.ChName = f.ChName
|
||
updates["ch_name"] = f.ChName
|
||
}
|
||
if f.Description != old.Description {
|
||
old.Description = f.Description
|
||
updates["description"] = f.Description
|
||
}
|
||
|
||
//db update
|
||
tx, err = s.gorm.BeginTx(c)
|
||
if err != nil {
|
||
log.Error("UpdateFlow s.gorm.BeginTx error(%v)", err)
|
||
return
|
||
}
|
||
if len(updates) > 0 {
|
||
if err = s.gorm.UpdateFields(c, tx, net.TableFlow, old.ID, updates); err != nil {
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
}
|
||
if startFlowID >= 0 {
|
||
if err = s.gorm.NetBindStartFlow(c, tx, n.ID, startFlowID); err != nil {
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
}
|
||
if diffBind, changedBind, err, msg = s.compareFlowBind(c, tx, f.ID, f.TokenIDList, true); err != nil {
|
||
log.Error("updateFlow s.compareFlowBind error(%v) params(%+v)", err, f)
|
||
tx.Rollback()
|
||
return
|
||
}
|
||
if diffBind != "" {
|
||
diff = append(diff, diffBind)
|
||
}
|
||
if err = tx.Commit().Error; err != nil {
|
||
log.Error("UpdateFlow tx.Commit error(%v)", err)
|
||
return
|
||
}
|
||
s.delFlowCache(c, old, changedBind)
|
||
|
||
//日志
|
||
if len(diff) == 0 {
|
||
return
|
||
}
|
||
oper := &model.NetConfOper{
|
||
OID: old.ID,
|
||
Action: model.LogNetActionUpdate,
|
||
UID: uid,
|
||
NetID: old.NetID,
|
||
ChName: old.ChName,
|
||
FlowID: old.ID,
|
||
Diff: diff,
|
||
}
|
||
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
||
return
|
||
}
|
||
|
||
//SwitchFlow .
|
||
func (s *Service) SwitchFlow(c context.Context, id int64, needDisable bool) (err error) {
|
||
var (
|
||
old *net.Flow
|
||
n *net.Net
|
||
dirs []*net.Direction
|
||
action string
|
||
)
|
||
if old, err = s.gorm.FlowByID(c, id); err != nil {
|
||
log.Error("SwitchFlow s.gorm.FlowByID(%d) error(%v) needDisable(%v)", id, err, needDisable)
|
||
return
|
||
}
|
||
log.Info("SwitchFlow id(%d) needdisable(%v) old-flow(%+v)", id, needDisable, old)
|
||
available := old.IsAvailable()
|
||
if available == !needDisable {
|
||
return
|
||
}
|
||
|
||
if needDisable {
|
||
if dirs, err = s.gorm.DirectionByFlowID(c, []int64{id}, 0); err != nil {
|
||
log.Error("SwitchFlow s.gorm.DirectionByFlowID(%d) error(%v)", id, err)
|
||
return
|
||
}
|
||
if len(dirs) > 0 {
|
||
log.Error("SwitchFlow dir by flow(%d) founded", id)
|
||
err = ecode.AegisFlowBinded
|
||
return
|
||
}
|
||
if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
|
||
log.Error("SwitchFlow s.gorm.NetByID(%d) error(%v) flow(%d)", old.NetID, err, id)
|
||
return
|
||
}
|
||
if n.StartFlowID == id {
|
||
log.Error("SwitchFlow net(%d).startflow=flow(%d) founded", n.ID, id)
|
||
err = ecode.AegisFlowBinded
|
||
return
|
||
}
|
||
old.DisableTime = time.Now()
|
||
action = model.LogNetActionDisable
|
||
} else {
|
||
old.DisableTime = net.Recovered
|
||
action = model.LogNetActionAvailable
|
||
}
|
||
|
||
if err = s.gorm.UpdateFields(c, nil, net.TableFlow, id, map[string]interface{}{"disable_time": old.DisableTime}); err != nil {
|
||
return
|
||
}
|
||
s.delFlowCache(c, old, nil)
|
||
|
||
//日志
|
||
oper := &model.NetConfOper{
|
||
OID: old.ID,
|
||
Action: action,
|
||
UID: old.UID,
|
||
NetID: old.NetID,
|
||
ChName: old.ChName,
|
||
FlowID: old.ID,
|
||
}
|
||
s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
|
||
return
|
||
}
|