Skip to content

Commit 86cd961

Browse files
authored
Merge pull request #2350 from actiontech/fix-issue1441-4
Fix issue1441 4
2 parents 3ad56b0 + b1cdb24 commit 86cd961

File tree

10 files changed

+209
-0
lines changed

10 files changed

+209
-0
lines changed

sqle/api/controller/v2/workflow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,12 @@ func UpdateWorkflowScheduleV2(c echo.Context) error {
10211021
return controller.JSONBaseErrorReq(c, err)
10221022
}
10231023

1024+
if req.IsNotify != nil && *req.IsNotify && req.NotifyType != nil {
1025+
if err := createNotifyRecord(*req.NotifyType, curTaskRecord); err != nil {
1026+
return controller.JSONBaseErrorReq(c, err)
1027+
}
1028+
}
1029+
10241030
return c.JSON(http.StatusOK, controller.NewBaseReq(nil))
10251031
}
10261032

sqle/api/controller/v2/workflow_ce.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
//go:build !enterprise
2+
// +build !enterprise
3+
4+
package v2
5+
6+
import "github.com/actiontech/sqle/sqle/model"
7+
8+
func createNotifyRecord(notifyType string, curTaskRecord *model.WorkflowInstanceRecord) error {
9+
// nothing
10+
return nil
11+
}

sqle/model/configuration.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,3 +285,70 @@ func (s *Storage) GetFeishuInstByStatus(status string) ([]FeishuInstance, error)
285285
}
286286
return feishuInst, nil
287287
}
288+
289+
type WechatOAStatus int
290+
291+
const (
292+
INITIALIZED WechatOAStatus = 1
293+
APPROVED WechatOAStatus = 2
294+
REJECTED WechatOAStatus = 3
295+
)
296+
297+
type WechatRecord struct {
298+
Model
299+
TaskId uint `json:"task_id" gorm:"column:task_id"`
300+
OaResult string `json:"oa_result" gorm:"column:oa_result;default:\"INITIALIZED\""`
301+
SpNo string `json:"sp_no" gorm:"column:sp_no"`
302+
303+
Task *Task `gorm:"foreignkey:TaskId"`
304+
}
305+
306+
func (s *Storage) GetWechatRecordByStatus(status string) ([]WechatRecord, error) {
307+
var wcRecords []WechatRecord
308+
err := s.db.Where("oa_result = ?", status).Find(&wcRecords).Error
309+
if err != nil {
310+
return nil, err
311+
}
312+
return wcRecords, nil
313+
}
314+
315+
func (s *Storage) WechatCancelScheduledTask(w WechatRecord) error {
316+
err := s.db.Transaction(func(tx *gorm.DB) error {
317+
// 取消该task对应的定时上线任务,将WorkflowInstanceRecord表中的ScheduledAt字段设置为null
318+
insRecord := WorkflowInstanceRecord{}
319+
err := s.db.Where("task_id = ?", w.TaskId).Find(&insRecord).Error
320+
if err != nil {
321+
return err
322+
}
323+
insRecord.ScheduledAt = nil
324+
err = s.Save(&insRecord)
325+
if err != nil {
326+
return err
327+
}
328+
329+
w.OaResult = ApproveStatusRefuse
330+
err = s.Save(&w)
331+
if err != nil {
332+
return err
333+
}
334+
return nil
335+
})
336+
return err
337+
}
338+
339+
func (s *Storage) GetWechatRecordByTaskId(taskId uint) (*WechatRecord, error) {
340+
var wcRecord WechatRecord
341+
err := s.db.Where("task_id = ?", taskId).Find(&wcRecord).Error
342+
if err != nil {
343+
return nil, err
344+
}
345+
return &wcRecord, nil
346+
}
347+
348+
func (s *Storage) UpdateWechatRecordByTaskId(taskId uint, m map[string]interface{}) error {
349+
err := s.db.Model(&WechatRecord{}).Where("task_id = ?", taskId).Updates(m).Error
350+
if err != nil {
351+
return errors.New(errors.ConnectStorageError, err)
352+
}
353+
return nil
354+
}

sqle/model/utils.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ var autoMigrateList = []interface{}{
149149
&CompanyNotice{},
150150
&SqlManageEndpoint{},
151151
&SQLDevRecord{},
152+
&WechatRecord{},
152153
}
153154

154155
func (s *Storage) AutoMigrate() error {

sqle/model/workflow.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,23 @@ type WorkflowInstanceRecord struct {
237237
// 用于区分工单处于上线步骤时,某个数据源是否已上线,因为数据源可以分批上线
238238
IsSQLExecuted bool
239239
ExecutionUserId string
240+
// 定时上线是否需要发生通知
241+
NeedScheduledTaskNotify bool
240242

241243
Instance *Instance `gorm:"foreignkey:InstanceId"`
242244
Task *Task `gorm:"foreignkey:TaskId"`
243245
// User *User `gorm:"foreignkey:ExecutionUserId"`
244246
ExecutionAssignees string
245247
}
246248

249+
func (s *Storage) UpdateWorkflowInstanceRecordById(id uint, m map[string]interface{}) error {
250+
err := s.db.Model(&WorkflowInstanceRecord{}).Where("id = ?", id).Updates(m).Error
251+
if err != nil {
252+
return errors.New(errors.ConnectStorageError, err)
253+
}
254+
return nil
255+
}
256+
247257
func (s *Storage) GetWorkInstanceRecordByTaskId(id string) (instanceRecord WorkflowInstanceRecord, err error) {
248258
return instanceRecord, s.db.Where("task_id = ?", id).First(&instanceRecord).Error
249259
}

sqle/pkg/im/im.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,55 @@ func BatchCancelApprove(workflowIds []string, user *model.User) {
187187
}
188188
}
189189
}
190+
191+
func CreateScheduledApprove(taskId uint, projectId, workflowId string) {
192+
newLog := log.NewEntry()
193+
s := model.GetStorage()
194+
195+
workflow, err := dms.GetWorkflowDetailByWorkflowId(projectId, workflowId, s.GetWorkflowDetailWithoutInstancesByWorkflowID)
196+
if err != nil {
197+
newLog.Errorf("get workflow error: %v", err)
198+
}
199+
assignUserIds := workflow.CurrentAssigneeUser()
200+
201+
assignUsers, err := dms.GetUsers(context.TODO(), assignUserIds, controller.GetDMSServerAddress())
202+
if err != nil {
203+
newLog.Errorf("get user error: %v", err)
204+
return
205+
}
206+
207+
ims, err := s.GetAllIMConfig()
208+
if err != nil {
209+
newLog.Errorf("get im config error: %v", err)
210+
return
211+
}
212+
213+
for _, im := range ims {
214+
if !im.IsEnable {
215+
continue
216+
}
217+
218+
systemVariables, err := s.GetAllSystemVariables()
219+
if err != nil {
220+
newLog.Errorf("get sqle url system variables error: %v", err)
221+
continue
222+
}
223+
224+
sqleUrl := systemVariables[model.SystemVariableSqleUrl].Value
225+
workflowUrl := fmt.Sprintf("%v/project/%s/order/%s", sqleUrl, workflow.ProjectId, workflow.WorkflowId)
226+
if sqleUrl == "" {
227+
newLog.Errorf("sqle url is empty")
228+
workflowUrl = ""
229+
}
230+
231+
switch im.Type {
232+
case model.ImTypeWechatAudit:
233+
if err := CreateWechatAuditRecord(context.TODO(), im, workflow, assignUsers, workflowUrl, taskId); err != nil {
234+
newLog.Errorf("create wechat audit error: %v", err)
235+
continue
236+
}
237+
default:
238+
newLog.Errorf("im type %s not found", im.Type)
239+
}
240+
}
241+
}

sqle/pkg/im/im_ce.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,7 @@ func CancelDingdingAuditInst(ctx context.Context, im model.IM, workflowIDs []str
4949
func CreateWechatAuditTemplate(ctx context.Context, im model.IM) error {
5050
return ErrCommunityEditionNotSupportWechatAudit
5151
}
52+
53+
func CreateWechatAuditRecord(ctx context.Context, im model.IM, workflow *model.Workflow, assignUsers []*model.User, url string, taskId uint) error {
54+
return ErrCommunityEditionNotSupportWechatAudit
55+
}

sqle/server/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var OnlyRunOnLeaderJobs = []func(entry *logrus.Entry) ServerJob{
1717
NewCleanJob,
1818
NewDingTalkJob,
1919
NewFeishuJob,
20+
NewWechatJob,
2021
}
2122

2223
var RunOnAllJobs = []func(entry *logrus.Entry) ServerJob{

sqle/server/wechat_ce.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//go:build !enterprise
2+
// +build !enterprise
3+
4+
package server
5+
6+
import (
7+
"time"
8+
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
type WechatJob struct {
13+
BaseJob
14+
}
15+
16+
func NewWechatJob(entry *logrus.Entry) ServerJob {
17+
w := new(WechatJob)
18+
w.BaseJob = *NewBaseJob(entry, 60*time.Second, w.wechatRotation)
19+
return w
20+
}
21+
22+
// 当前企微轮询只为二次确认工单定时上线功能
23+
// https://github.com/actiontech/sqle-ee/issues/1441
24+
func (w *WechatJob) wechatRotation(entry *logrus.Entry) {
25+
// nothing
26+
}

sqle/server/workflow_schedule.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/actiontech/sqle/sqle/log"
1616
"github.com/actiontech/sqle/sqle/model"
1717
"github.com/actiontech/sqle/sqle/notification"
18+
"github.com/actiontech/sqle/sqle/pkg/im"
1819
"github.com/sirupsen/logrus"
1920
)
2021

@@ -33,13 +34,17 @@ func NewWorkflowScheduleJob(entry *logrus.Entry) ServerJob {
3334

3435
func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
3536
st := model.GetStorage()
37+
38+
entry.Error("start WorkflowSchedule")
3639
workflows, err := st.GetNeedScheduledWorkflows()
3740
if err != nil {
3841
entry.Errorf("get need scheduled workflows from storage error: %v", err)
3942
return
4043
}
44+
entry.Errorf("start WorkflowSchedule workflow:%v", workflows)
4145
now := time.Now()
4246
for _, workflow := range workflows {
47+
entry.Errorf("get workflow id %v", workflow.WorkflowId)
4348
w, err := dms.GetWorkflowDetailByWorkflowId(string(workflow.ProjectId), workflow.WorkflowId, st.GetWorkflowDetailWithoutInstancesByWorkflowID)
4449
if err != nil {
4550
entry.Errorf("get workflow from storage error: %v", err)
@@ -60,6 +65,17 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
6065
needExecuteTaskIds := map[uint]string{}
6166
for _, ir := range w.Record.InstanceRecords {
6267
if !ir.IsSQLExecuted && ir.ScheduledAt != nil && ir.ScheduledAt.Before(now) {
68+
if ir.NeedScheduledTaskNotify {
69+
isOaAgree, err := getOaApproveResult(ir.TaskId, workflow)
70+
71+
if err != nil {
72+
entry.Error(err)
73+
continue
74+
}
75+
if !isOaAgree {
76+
continue
77+
}
78+
}
6379
needExecuteTaskIds[ir.TaskId] = ir.ScheduleUserId
6480
}
6581
}
@@ -75,6 +91,21 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
7591
}
7692
}
7793

94+
func getOaApproveResult(taskId uint, workflow *model.Workflow) (bool, error) {
95+
s := model.GetStorage()
96+
record, err := s.GetWechatRecordByTaskId(taskId)
97+
if err != nil {
98+
return false, err
99+
}
100+
if record.SpNo == "" {
101+
go im.CreateScheduledApprove(taskId, string(workflow.ProjectId), workflow.WorkflowId)
102+
return false, nil
103+
} else if record.OaResult == model.ApproveStatusAgree {
104+
return true, nil
105+
}
106+
return false, nil
107+
}
108+
78109
func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]string) error {
79110
s := model.GetStorage()
80111

0 commit comments

Comments
 (0)