Skip to content

Commit 8f0f709

Browse files
Merge pull request #2589 from actiontech/audit_plan_sql_audit
Audit plan sql audit optimize
2 parents e7afe89 + 7133c9f commit 8f0f709

File tree

8 files changed

+92
-40
lines changed

8 files changed

+92
-40
lines changed

sqle/api/controller/v2/sql_manage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type SqlManage struct {
2222
InstanceName string `json:"instance_name"`
2323
SchemaName string `json:"schema_name"`
2424
AuditResult []*v1.AuditResult `json:"audit_result"`
25+
AuditStatus string `json:"audit_status" enums:"being_audited"`
2526
FirstAppearTimeStamp string `json:"first_appear_timestamp"`
2627
LastReceiveTimeStamp string `json:"last_receive_timestamp"`
2728
FpCount uint64 `json:"fp_count"`

sqle/docs/docs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17015,6 +17015,12 @@ var doc = `{
1701517015
"$ref": "#/definitions/v1.AuditResult"
1701617016
}
1701717017
},
17018+
"audit_status": {
17019+
"type": "string",
17020+
"enum": [
17021+
"being_audited"
17022+
]
17023+
},
1701817024
"endpoints": {
1701917025
"type": "string"
1702017026
},

sqle/docs/swagger.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16999,6 +16999,12 @@
1699916999
"$ref": "#/definitions/v1.AuditResult"
1700017000
}
1700117001
},
17002+
"audit_status": {
17003+
"type": "string",
17004+
"enum": [
17005+
"being_audited"
17006+
]
17007+
},
1700217008
"endpoints": {
1700317009
"type": "string"
1700417010
},

sqle/docs/swagger.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4947,6 +4947,10 @@ definitions:
49474947
items:
49484948
$ref: '#/definitions/v1.AuditResult'
49494949
type: array
4950+
audit_status:
4951+
enum:
4952+
- being_audited
4953+
type: string
49504954
endpoints:
49514955
type: string
49524956
first_appear_timestamp:

sqle/model/instance_audit_plan.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -481,22 +481,31 @@ func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) {
481481
return sqls, err
482482
}
483483

484-
func (s *Storage) RemoveSQLFromQueue(sql *SQLManageQueue) error {
485-
return s.db.Unscoped().Delete(sql).Error
484+
func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error {
485+
return txDB.Unscoped().Delete(sql).Error
486486
}
487487

488-
func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error {
489-
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " +
490-
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " +
491-
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)"
492-
return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error
488+
func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error {
489+
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " +
490+
"VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " +
491+
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)"
492+
return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error
493493
}
494494

495-
func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error {
495+
func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error {
496496
const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id)
497497
SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ?
498498
ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);`
499-
return s.db.Exec(query, sql.SQLID).Error
499+
return txDB.Exec(query, sql.SQLID).Error
500+
}
501+
502+
func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error {
503+
err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId).
504+
Updates(sqlManageMap).Error
505+
if err != nil {
506+
return err
507+
}
508+
return nil
500509
}
501510

502511
func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error {

sqle/model/instance_audit_plan_list.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ audit_plan_sqls.sql_fingerprint,
169169
audit_plan_sqls.sql_text,
170170
audit_plan_sqls.schema_name,
171171
audit_plan_sqls.info,
172-
audit_plan_sqls.audit_results,
172+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
173173
audit_plan_sqls.priority
174174
175175
{{- template "body" . -}}

sqle/model/instance_audit_plan_list_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ audit_plan_sqls.sql_fingerprint,
3232
audit_plan_sqls.sql_text,
3333
audit_plan_sqls.schema_name,
3434
audit_plan_sqls.info,
35-
audit_plan_sqls.audit_results,
35+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
3636
audit_plan_sqls.priority
3737
3838
FROM sql_manage_records AS audit_plan_sqls
@@ -59,7 +59,7 @@ audit_plan_sqls.sql_fingerprint,
5959
audit_plan_sqls.sql_text,
6060
audit_plan_sqls.schema_name,
6161
audit_plan_sqls.info,
62-
audit_plan_sqls.audit_results,
62+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
6363
audit_plan_sqls.priority
6464
6565
FROM sql_manage_records AS audit_plan_sqls
@@ -87,7 +87,7 @@ audit_plan_sqls.sql_fingerprint,
8787
audit_plan_sqls.sql_text,
8888
audit_plan_sqls.schema_name,
8989
audit_plan_sqls.info,
90-
audit_plan_sqls.audit_results,
90+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
9191
audit_plan_sqls.priority
9292
9393
FROM sql_manage_records AS audit_plan_sqls
@@ -115,7 +115,7 @@ audit_plan_sqls.sql_fingerprint,
115115
audit_plan_sqls.sql_text,
116116
audit_plan_sqls.schema_name,
117117
audit_plan_sqls.info,
118-
audit_plan_sqls.audit_results,
118+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
119119
audit_plan_sqls.priority
120120
121121
FROM sql_manage_records AS audit_plan_sqls
@@ -144,7 +144,7 @@ audit_plan_sqls.sql_fingerprint,
144144
audit_plan_sqls.sql_text,
145145
audit_plan_sqls.schema_name,
146146
audit_plan_sqls.info,
147-
audit_plan_sqls.audit_results,
147+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
148148
audit_plan_sqls.priority
149149
150150
FROM sql_manage_records AS audit_plan_sqls
@@ -172,7 +172,7 @@ audit_plan_sqls.sql_fingerprint,
172172
audit_plan_sqls.sql_text,
173173
audit_plan_sqls.schema_name,
174174
audit_plan_sqls.info,
175-
audit_plan_sqls.audit_results,
175+
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
176176
audit_plan_sqls.priority
177177
178178
FROM sql_manage_records AS audit_plan_sqls

sqle/server/auditplan/job_task_handler.go

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/actiontech/sqle/sqle/model"
1111
"github.com/actiontech/sqle/sqle/server"
1212
"github.com/sirupsen/logrus"
13+
"gorm.io/gorm"
1314
)
1415

1516
type AuditPlanHandlerJob struct {
@@ -61,38 +62,63 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) {
6162
if len(sqlList) == 0 {
6263
return
6364
}
64-
// 审核
65-
sqlList, err = BatchAuditSQLs(sqlList, true)
66-
if err != nil {
67-
entry.Warnf("batch audit origin manager sql failed, error: %v", err)
65+
66+
// todo: 错误处理
67+
if err = s.Tx(func(txDB *gorm.DB) error {
68+
for _, sql := range sqlList {
69+
err := s.SaveManagerSQL(txDB, sql)
70+
if err != nil {
71+
entry.Warnf("update manager sql failed, error: %v", err)
72+
return err
73+
}
74+
75+
// 更新状态表
76+
err = s.UpdateManagerSQLStatus(txDB, sql)
77+
if err != nil {
78+
entry.Warnf("update manager sql status failed, error: %v", err)
79+
return err
80+
}
81+
}
82+
83+
for _, sql := range queues {
84+
err := s.RemoveSQLFromQueue(txDB, sql)
85+
if err != nil {
86+
entry.Warnf("remove manager sql queue failed, error: %v", err)
87+
return err
88+
}
89+
}
90+
91+
return nil
92+
93+
}); err != nil {
6894
return
6995
}
96+
97+
go handlerSQLAudit(entry, sqlList)
98+
99+
}
100+
101+
// todo: 错误处理
102+
func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) {
103+
s := model.GetStorage()
104+
sqlList, err := BatchAuditSQLs(sqlList, true)
105+
if err != nil {
106+
entry.Warnf("batch audit manager sql failed, error: %v", err)
107+
}
108+
// 设置高优先级
70109
sqlList, err = SetSQLPriority(sqlList)
71110
if err != nil {
72-
entry.Warnf("check sql priority sql failed, error: %v", err)
73-
return
111+
entry.Warnf("set sql priority sql failed, error: %v", err)
74112
}
75-
// todo: 保证事务和错误处理
76113
for _, sql := range sqlList {
77-
err := s.UpdateManagerSQL(sql)
114+
manageSqlParam := make(map[string]interface{}, 3)
115+
manageSqlParam["audit_level"] = sql.AuditLevel
116+
manageSqlParam["audit_results"] = sql.AuditResults
117+
manageSqlParam["priority"] = sql.Priority
118+
err = s.UpdateManagerSQLBySqlId(manageSqlParam, sql.SQLID)
78119
if err != nil {
79120
entry.Warnf("update manager sql failed, error: %v", err)
80-
return
81-
}
82-
83-
// 同时更新状态表
84-
err = s.UpdateManagerSQLStatus(sql)
85-
if err != nil {
86-
entry.Warnf("update manager sql status failed, error: %v", err)
87-
return
88-
}
89-
90-
}
91-
for _, sql := range queues {
92-
err := s.RemoveSQLFromQueue(sql)
93-
if err != nil {
94-
entry.Warnf("remove manager sql queue failed, error: %v", err)
95-
return
121+
continue
96122
}
97123
}
98124
}

0 commit comments

Comments
 (0)