go-common/app/admin/main/aegis/service/task_consumer.go

136 lines
3.5 KiB
Go
Raw Permalink Normal View History

2019-04-22 02:59:20 +00:00
package service
import (
"context"
"go-common/app/admin/main/aegis/model/common"
"go-common/app/admin/main/aegis/model/task"
"go-common/library/log"
"go-common/library/sync/errgroup"
)
// On 上线
func (s *Service) On(c context.Context, opt *common.BaseOptions) (err error) {
if err = s.mc.ConsumerOn(c, opt); err != nil {
log.Error("s.mc.ConsumerOn mc错误(%v)", err)
err = nil
}
if err = s.mysql.ConsumerOn(c, opt); err != nil {
return
}
go s.sendTaskConsumerLog(context.TODO(), "on", opt)
return
}
// Off 离线
func (s *Service) Off(c context.Context, opt *common.BaseOptions) (err error) {
if err = s.mc.ConsumerOff(c, opt); err != nil {
log.Error("s.mc.ConsumerOff mc错误(%v)", err)
err = nil
}
if err = s.mysql.ConsumerOff(c, opt); err != nil {
return
}
go s.sendTaskConsumerLog(context.TODO(), "off", opt)
return
}
// KickOut 踢出
func (s *Service) KickOut(c context.Context, opt *common.BaseOptions, kickuid int64) (err error) {
opt.UID = kickuid
unames, _ := s.http.GetUnames(c, []int64{kickuid})
opt.Uname = unames[kickuid]
return s.Off(c, opt)
}
// Watcher 监控管理
func (s *Service) Watcher(c context.Context, bizid, flowid int64, role int8) (watchers []*task.WatchItem, err error) {
// 从数据库拿出24小时内有变化的或者依然在线的
var wis []*task.WatchItem
if wis, err = s.mysql.ConsumerStat(c, bizid, flowid); err != nil {
return
}
bopt := &common.BaseOptions{
BusinessID: bizid,
FlowID: flowid,
}
var (
onuids, offuids []int64
inxmap = make(map[int64]int)
)
for _, item := range wis {
bopt.UID = item.UID
bopt.Uname = ""
uname, urole, err := s.GetRole(c, bopt)
// 组员列表展示 组员 + 角色获取失败的 + 非任务角色(例如平台管理员)
if err != nil || role != urole || len(uname) == 0 {
if role == task.TaskRoleLeader || urole == task.TaskRoleLeader {
continue
}
}
isOn, _ := s.mc.IsConsumerOn(c, bopt)
item.Role = urole
item.IsOnLine = isOn
item.Uname = uname
if item.IsOnLine {
onuids = append(onuids, item.UID)
item.LastOn = item.Mtime.Format("2006-01-02 15:04:05")
} else {
offuids = append(offuids, item.UID)
item.LastOff = item.Mtime.Format("2006-01-02 15:04:05")
}
inxmap[item.UID] = len(watchers)
watchers = append(watchers, item)
}
// 补充laston 或者 lastoff
wg, ctx := errgroup.WithContext(c)
if len(onuids) > 0 {
wg.Go(func() error {
at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"off", "kickout"}, onuids, len(onuids))
if err == nil {
for uid, ctime := range at {
watchers[inxmap[uid]].LastOff = ctime
}
}
return err
})
}
if len(offuids) > 0 {
wg.Go(func() error {
at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"on"}, offuids, len(offuids))
if err == nil {
for uid, ctime := range at {
watchers[inxmap[uid]].LastOn = ctime
}
}
return err
})
}
wg.Go(func() error {
trans := func(c context.Context, ids []int64) (map[int64][]interface{}, error) {
return s.MemberStats(c, bizid, flowid, ids)
}
return s.mulIDtoName(ctx, watchers, trans, "UID", "Count", "CompleteRate", "PassRate", "AvgUT")
})
wg.Wait()
return
}
// IsOn .
func (s *Service) IsOn(c context.Context, opt *common.BaseOptions) bool {
on, err := s.mc.IsConsumerOn(c, opt)
if err != nil {
log.Error("s.mc.ConsumerOff mc错误(%v)", err)
if on, err = s.mysql.IsConsumerOn(c, opt); err != nil {
log.Error("s.mysql.ConsumerOff mysql错误(%v)", err)
}
}
return on
}