Skip to content

Commit b1cdb24

Browse files
committed
Schedule Rotation
1 parent bf0ab26 commit b1cdb24

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

sqle/server/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var OnlyRunOnLeaderJobs = []func(entry *logrus.Entry) ServerJob{
1717
NewCleanJob,
1818
NewDingTalkJob,
1919
NewFeishuJob,
20+
NewWechatJob,
2021
}
2122

2223
var RunOnAllJobs = []func(entry *logrus.Entry) ServerJob{

sqle/server/wechat_ce.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//go:build !enterprise
2+
// +build !enterprise
3+
4+
package server
5+
6+
import (
7+
"time"
8+
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
type WechatJob struct {
13+
BaseJob
14+
}
15+
16+
func NewWechatJob(entry *logrus.Entry) ServerJob {
17+
w := new(WechatJob)
18+
w.BaseJob = *NewBaseJob(entry, 60*time.Second, w.wechatRotation)
19+
return w
20+
}
21+
22+
// 当前企微轮询只为二次确认工单定时上线功能
23+
// https://github.com/actiontech/sqle-ee/issues/1441
24+
func (w *WechatJob) wechatRotation(entry *logrus.Entry) {
25+
// nothing
26+
}

sqle/server/workflow_schedule.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/actiontech/sqle/sqle/log"
1616
"github.com/actiontech/sqle/sqle/model"
1717
"github.com/actiontech/sqle/sqle/notification"
18+
"github.com/actiontech/sqle/sqle/pkg/im"
1819
"github.com/sirupsen/logrus"
1920
)
2021

@@ -33,13 +34,17 @@ func NewWorkflowScheduleJob(entry *logrus.Entry) ServerJob {
3334

3435
func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
3536
st := model.GetStorage()
37+
38+
entry.Error("start WorkflowSchedule")
3639
workflows, err := st.GetNeedScheduledWorkflows()
3740
if err != nil {
3841
entry.Errorf("get need scheduled workflows from storage error: %v", err)
3942
return
4043
}
44+
entry.Errorf("start WorkflowSchedule workflow:%v", workflows)
4145
now := time.Now()
4246
for _, workflow := range workflows {
47+
entry.Errorf("get workflow id %v", workflow.WorkflowId)
4348
w, err := dms.GetWorkflowDetailByWorkflowId(string(workflow.ProjectId), workflow.WorkflowId, st.GetWorkflowDetailWithoutInstancesByWorkflowID)
4449
if err != nil {
4550
entry.Errorf("get workflow from storage error: %v", err)
@@ -60,6 +65,17 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
6065
needExecuteTaskIds := map[uint]string{}
6166
for _, ir := range w.Record.InstanceRecords {
6267
if !ir.IsSQLExecuted && ir.ScheduledAt != nil && ir.ScheduledAt.Before(now) {
68+
if ir.NeedScheduledTaskNotify {
69+
isOaAgree, err := getOaApproveResult(ir.TaskId, workflow)
70+
71+
if err != nil {
72+
entry.Error(err)
73+
continue
74+
}
75+
if !isOaAgree {
76+
continue
77+
}
78+
}
6379
needExecuteTaskIds[ir.TaskId] = ir.ScheduleUserId
6480
}
6581
}
@@ -75,6 +91,21 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
7591
}
7692
}
7793

94+
func getOaApproveResult(taskId uint, workflow *model.Workflow) (bool, error) {
95+
s := model.GetStorage()
96+
record, err := s.GetWechatRecordByTaskId(taskId)
97+
if err != nil {
98+
return false, err
99+
}
100+
if record.SpNo == "" {
101+
go im.CreateScheduledApprove(taskId, string(workflow.ProjectId), workflow.WorkflowId)
102+
return false, nil
103+
} else if record.OaResult == model.ApproveStatusAgree {
104+
return true, nil
105+
}
106+
return false, nil
107+
}
108+
78109
func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]string) error {
79110
s := model.GetStorage()
80111

0 commit comments

Comments
 (0)