go-common/app/admin/main/aegis/service/task_dispatch.go
2019-04-22 02:59:20 +00:00

439 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"errors"
"time"
"go-common/app/admin/main/aegis/model/common"
taskmod "go-common/app/admin/main/aegis/model/task"
"go-common/library/cache/redis"
"go-common/library/ecode"
"go-common/library/log"
"github.com/jinzhu/gorm"
)
//ERROR
var (
ErrTaskMiss = errors.New("task miss")
)
// NextTask 下一个任务
func (s *Service) NextTask(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
log.Info("task-NextTask opt(%+v)", opt)
if count, err = s.countPersonalTask(c, &opt.BaseOptions, opt.NoCache); err != nil {
return
}
if count < opt.DispatchCount {
if count, err = s.syncSeize(c, opt); err != nil {
return
}
}
/* 去掉异步抢占
else if count < opt.SeizeCount {
s.asyncSeize(opt)
}
*/
if count == 0 {
return
}
return s.dispatch(c, opt)
}
// ListTasks 实时列表,停滞列表,延迟列表
func (s *Service) ListTasks(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
switch opt.State {
case 1: // 实时任务,从redis取出,在数据库校验
tasks, count, err = s.listUnseized(c, opt)
case 2: // 停滞任务,组员的直接从redis取。组长的从数据库取idredis取任务
tasks, count, err = s.listMyTasks(c, "seized", opt)
case 3: // 延迟任务,组员的直接从redis取。组长的从数据库取idredis取任务
tasks, count, err = s.listMyTasks(c, "delayd", opt)
case 4: // 指派停滞任务,从数据库取idredis取任务
tasks, count, err = s.listMyTasks(c, "assignd", opt)
default: // 所有未完成任务
}
if err != nil {
tasks, count, err = s.mysql.ListTasks(c, opt)
}
opt.Total = int(count)
return
}
// Task 直接读取某个任务
func (s *Service) Task(c context.Context, tid int64) (task *taskmod.Task, err error) {
return s.mysql.TaskFromDB(c, tid)
}
// TxSubmitTask 提交任务
func (s *Service) TxSubmitTask(c context.Context, ormTx *gorm.DB, opt *common.BaseOptions, state int8) (ostate int8, otaskid, ouid int64, err error) {
var (
t *taskmod.Task
rows int64
)
// 根据rid,flowid检索最新的未完成任务
if t, err = s.gorm.TaskByRID(c, opt.RID, opt.FlowID); err != nil || t == nil || t.ID == 0 {
log.Warn("TaskByRID(%d,%d) miss(%v)", opt.RID, opt.FlowID, err)
t, err = s.gorm.TaskByRID(c, opt.RID, 0)
}
// TODO 先默认一个资源只在一个flow下分发解决目前存在flow状态与task状态不同步
if err != nil || t == nil || t.ID == 0 {
log.Warn("s.gorm.TaskByRID(%d,%d) miss(%v)", opt.RID, 0, err)
err = nil
return
}
ostate = t.State
ouid = t.UID
otaskid = t.ID
var utime uint64
if !t.Gtime.Time().IsZero() {
utime = uint64(time.Since(t.Gtime.Time()).Seconds())
}
subopt := &taskmod.SubmitOptions{
BaseOptions: *opt,
TaskID: t.ID,
OldUID: t.UID,
Utime: utime,
OldState: t.State,
}
// 1. 改数据库
if rows, err = s.gorm.TxSubmit(ormTx, subopt, state); err != nil {
return
}
if rows != 1 {
err = ecode.NothingFound
log.Error("Submit (%v) error(%v)", opt, err)
return
}
return
}
func (s *Service) submitTaskCache(c context.Context, opt *common.BaseOptions, ostate int8, taskid, ouid int64) {
log.Info("SubmitTaskCache opt(%+v) ostate(%d) taskid(%d) ouid(%d)", opt, ostate, taskid, ouid)
optc := &common.BaseOptions{
BusinessID: opt.BusinessID,
FlowID: opt.FlowID,
UID: ouid,
}
if ostate == taskmod.TaskStateDelay {
s.redis.RemoveDelayTask(c, optc, taskid)
return
}
s.redis.RemovePersonalTask(c, optc, taskid)
}
// Delay 延迟任务
func (s *Service) Delay(c context.Context, opt *taskmod.DelayOptions) (err error) {
var (
taskmod *taskmod.Task
rows int64
)
if taskmod, err = s.mysql.TaskFromDB(c, opt.TaskID); err != nil || taskmod == nil {
return
}
if !s.checkDelayOption(c, opt, taskmod) {
log.Error("checkDelayOption error opt(%+v) taskmod(%+v)", opt, taskmod)
return ecode.AegisTaskFinish
}
if rows, err = s.mysql.Delay(c, opt); err != nil {
return
}
if rows != 1 {
err = ecode.AegisTaskFinish
log.Error("Submit (%v) error(%v)", opt, err)
return
}
if err = s.redis.RemovePersonalTask(c, &opt.BaseOptions, opt.TaskID); err != nil {
return
}
s.redis.PushDelayTask(c, &opt.BaseOptions, opt.TaskID)
return
}
// Release 释放任务
func (s *Service) Release(c context.Context, opt *common.BaseOptions, delay bool) (rows int64, err error) {
if rows, err = s.mysql.Release(c, opt, delay); err != nil {
return
}
//err = s.redis.Release(c, opt, delay)
return
}
// MaxWeight 当前最高权重
func (s *Service) MaxWeight(c context.Context, opt *common.BaseOptions) (max int64, err error) {
return s.gorm.MaxWeight(c, opt.BusinessID, opt.FlowID)
}
// UnDoStat undo stat
func (s *Service) UnDoStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.UnDOStat, err error) {
return s.gorm.UndoStat(c, opt.BusinessID, opt.FlowID, opt.UID)
}
// TaskStat task stat
func (s *Service) TaskStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.Stat, err error) {
return s.gorm.TaskStat(c, opt.BusinessID, opt.FlowID, opt.UID)
}
func (s *Service) countPersonalTask(c context.Context, opt *common.BaseOptions, nocache bool) (count int64, err error) {
log.Info("task-countPersonalTask opt(%+v) nocache(%v)", opt, nocache)
defer func() { log.Info("task-countPersonalTask count(%d) err(%v)", count, err) }()
if nocache {
return s.mysql.CountPersonal(c, opt)
}
if count, err = s.redis.CountPersonalTask(c, opt); err != nil {
// redis 挂了
if count, err = s.mysql.CountPersonal(c, opt); err != nil {
return
}
}
return
}
func (s *Service) syncSeize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
return s.seize(c, opt)
}
func (s *Service) seize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
log.Info("task-seize opt(%+v)", opt)
defer func() { log.Info("task-seize count(%d) err(%v)", count, err) }()
var (
hitids, missids []int64
others map[int64]int64
)
// TODO: 抢占任务要根据用户是否在线,处理任务指派
if opt.NoCache {
hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
} else {
hitids, missids, others, err = s.redis.SeizeTask(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
if err != nil {
hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
}
}
if err != nil {
return
}
log.Info("seize uid(%d) hitids(%v), missids(%v), others(%v)", opt.UID, hitids, missids, others)
if !opt.NoCache && len(missids) > 0 {
log.Error("seize uid(%d) missids(%v)", opt.UID, missids)
for _, id := range missids {
if err = s.syncTask(c, id); err != nil {
s.redis.RemovePublicTask(c, &opt.BaseOptions, id)
}
}
}
if len(hitids) > 0 {
log.Info("seize uid(%d) hitids(%v)", opt.UID, hitids)
mhits := make(map[int64]int64)
for _, id := range hitids {
mhits[id] = opt.UID
}
if count, err = s.mysql.Seize(c, mhits); err != nil || count == 0 {
return
}
return
}
return
}
func (s *Service) dispatch(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
log.Info("task-dispatch opt(%+v)", opt)
defer func() { log.Info("task-dispatch tasks(%+v) count(%d) err(%v)", tasks, count, err) }()
listopt := &taskmod.ListOptions{
BaseOptions: opt.BaseOptions,
Pager: common.Pager{
Pn: 1,
Ps: int(opt.DispatchCount),
}}
tasks, count, err = s.calibur(c, listopt, s.redis.RangePersonalTask, s.mysql.DispatchByID, s.redis.RemovePersonalTask)
if err != nil {
tasks, count, err = s.mysql.DBDispatch(c, opt)
}
return
}
func (s *Service) syncTask(c context.Context, taskID int64) (err error) {
var task *taskmod.Task
if task, err = s.mysql.TaskFromDB(c, taskID); err != nil || task == nil {
return ErrTaskMiss
}
var option = &common.BaseOptions{
BusinessID: task.BusinessID,
FlowID: task.FlowID,
UID: task.UID,
}
s.redis.SetTask(c, task)
switch task.State {
case taskmod.TaskStateInit:
s.redis.PushPublicTask(c, task)
case taskmod.TaskStateDispatch:
s.redis.RemovePublicTask(c, option, task.ID)
s.redis.PushPersonalTask(c, option, task.ID)
case taskmod.TaskStateDelay:
s.redis.RemovePublicTask(c, option, task.ID)
s.redis.PushDelayTask(c, option, task.ID)
default:
s.redis.RemovePublicTask(c, option, task.ID)
}
return
}
func (s *Service) listUnseized(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
return s.calibur(c, opt, s.redis.RangePublicTask, s.mysql.ListCheckUnSeized, s.redis.RemovePublicTask)
}
func (s *Service) listMyTasks(c context.Context, ltp string, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
if !opt.BisLeader {
if ltp == "delayd" {
return s.calibur(c, opt, s.redis.RangeDealyTask, s.mysql.ListCheckDelay, s.redis.RemoveDelayTask)
}
if ltp == "seized" {
return s.calibur(c, opt, s.redis.RangePersonalTask, s.mysql.ListCheckSeized, s.redis.RemovePersonalTask)
}
}
if opt.BisLeader {
opt.UID = 0
}
var ids []int64
switch ltp {
case "delayd":
ids, count, err = s.gorm.TaskListDelayd(c, opt)
case "seized":
ids, count, err = s.gorm.TaskListSeized(c, opt)
case "assignd":
ids, count, err = s.gorm.TaskListAssignd(c, opt)
}
if err != nil || len(ids) == 0 {
return
}
if tasks, err = s.redis.GetTask(c, ids); err != nil {
err = redis.ErrNil
}
return
}
func (s *Service) calibur(c context.Context, opt *taskmod.ListOptions, rfunc taskmod.RangeFunc, lfunc taskmod.ListFuncDB, remove taskmod.RemoveFunc) (taskmods []*taskmod.Task, count int64, err error) {
var (
hitids, missids []int64
missmap map[int64]struct{}
mtaskmods map[int64]*taskmod.Task
)
mtaskmods, count, hitids, missids, err = rfunc(c, opt)
log.Info("calibur(%+v) rfunc count(%d) hitids(%v) missids(%v)", opt, count, hitids, missids)
if err != nil {
return
}
if len(missids) > 0 {
for _, id := range missids {
if err = s.syncTask(c, id); err != nil {
log.Error("syncTask error(%v)", err)
remove(c, &opt.BaseOptions, id)
}
}
}
if len(hitids) > 0 {
if missmap, err = lfunc(c, mtaskmods, hitids, opt.UID); err != nil {
log.Error("calibur lfunc error(%v)", err)
return
}
if len(missmap) > 0 {
log.Info("calibur personal任务移除%v", missmap)
for id := range missmap {
remove(c, &opt.BaseOptions, id)
}
}
}
for _, id := range hitids {
if _, ok := missmap[id]; ok && opt.Action != "release" {
delete(mtaskmods, id)
} else {
taskmods = append(taskmods, mtaskmods[id])
}
}
return
}
/*
func (s *Service) checkSubmitOption(c context.Context, opt *taskmod.SubmitOptions, task *taskmod.Task) bool {
opt.OldState = task.State
opt.OldUID = task.UID
// 1. 组员只能处理自己的延迟任务
if task.State == taskmod.TaskStateDelay {
if opt.BisLeader {
return true
}
if task.UID != opt.UID {
return false
}
}
if task.State == taskmod.TaskStateDispatch && opt.UID == task.UID {
opt.Utime = uint64(time.Since(task.Gtime.Time()).Seconds())
return true
}
return false
}
*/
func (s *Service) checkDelayOption(c context.Context, opt *taskmod.DelayOptions, task *taskmod.Task) bool {
if task.State == taskmod.TaskStateDispatch && task.UID == opt.UID {
return true
}
return false
}
func (s *Service) syncUpCache(c context.Context) (err error) {
if s.Debug() == "local" {
return
}
upGroup := make(map[int64]*common.Group)
upgs, err := s.rpc.UpGroups(c)
if err != nil || upgs == nil {
return
}
for gid, upg := range upgs {
if _, ok := upGroup[gid]; !ok {
upGroup[gid] = &common.Group{
ID: gid,
Name: upg.Name,
Note: upg.Note,
Tag: upg.Tag,
FontColor: upg.FontColor,
BgColor: upg.BgColor,
}
log.Info("groupCache upg(%+v) upGroup(%+v)", upg, upGroup[gid])
}
}
s.groupCache = upGroup
log.Info("groupCache(%+v)", s.groupCache)
return
}