-
Notifications
You must be signed in to change notification settings - Fork 125
feat(streaming): clickhouse query optimize #3030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
57412be
a5085a5
55d8f2b
ad06cc5
16fd3b5
7cddba5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||||||||
"math" | ||||||||||||||||||||||||||||||||||||||||||||||
"sort" | ||||||||||||||||||||||||||||||||||||||||||||||
"strings" | ||||||||||||||||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -26,6 +27,7 @@ type queryMeter struct { | |||||||||||||||||||||||||||||||||||||||||||||
GroupBy []string | ||||||||||||||||||||||||||||||||||||||||||||||
WindowSize *meterpkg.WindowSize | ||||||||||||||||||||||||||||||||||||||||||||||
WindowTimeZone *time.Location | ||||||||||||||||||||||||||||||||||||||||||||||
QuerySettings map[string]string | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// from returns the from time for the query. | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -150,7 +152,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { | |||||||||||||||||||||||||||||||||||||||||||||
// TODO: remove this when we don't round to the nearest minute anymore | ||||||||||||||||||||||||||||||||||||||||||||||
// We round them to the nearest minute to ensure the result is the same as with | ||||||||||||||||||||||||||||||||||||||||||||||
// streaming connector using materialized views with per minute windows | ||||||||||||||||||||||||||||||||||||||||||||||
selectColumn := fmt.Sprintf("tumbleStart(min(%s), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(%s), toIntervalMinute(1)) AS windowend", timeColumn, timeColumn) | ||||||||||||||||||||||||||||||||||||||||||||||
selectColumn := fmt.Sprintf("toStartOfMinute(min(%s)) AS windowstart, toStartOfMinute(max(%s)) + INTERVAL 1 MINUTE AS windowend", timeColumn, timeColumn) | ||||||||||||||||||||||||||||||||||||||||||||||
selectColumns = append(selectColumns, selectColumn) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -207,6 +209,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { | |||||||||||||||||||||||||||||||||||||||||||||
query := sqlbuilder.ClickHouse.NewSelectBuilder() | ||||||||||||||||||||||||||||||||||||||||||||||
query.Select(selectColumns...) | ||||||||||||||||||||||||||||||||||||||||||||||
query.From(tableName) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Prewhere clauses | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.Equal(getColumn("namespace"), d.Namespace)) | ||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.Equal(getColumn("type"), d.Meter.EventType)) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -218,15 +223,32 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { | |||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.Or(slicesx.Map(d.Subject, mapFunc)...)) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Apply the time where clause | ||||||||||||||||||||||||||||||||||||||||||||||
from := d.from() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if from != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if d.To != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.LessThan(timeColumn, d.To.Unix())) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
var sqlPreWhere string | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if len(d.FilterGroupBy) > 0 { | ||||||||||||||||||||||||||||||||||||||||||||||
// We sort the group by s to ensure the query is deterministic | ||||||||||||||||||||||||||||||||||||||||||||||
groupByKeys := make([]string, 0, len(d.FilterGroupBy)) | ||||||||||||||||||||||||||||||||||||||||||||||
sqlPreWhere, _ = query.Build() | ||||||||||||||||||||||||||||||||||||||||||||||
dataColumn := getColumn("data") | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// We sort the group bys to ensure the query is deterministic | ||||||||||||||||||||||||||||||||||||||||||||||
filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy)) | ||||||||||||||||||||||||||||||||||||||||||||||
for k := range d.FilterGroupBy { | ||||||||||||||||||||||||||||||||||||||||||||||
groupByKeys = append(groupByKeys, k) | ||||||||||||||||||||||||||||||||||||||||||||||
filterGroupByKeys = append(filterGroupByKeys, k) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
sort.Strings(groupByKeys) | ||||||||||||||||||||||||||||||||||||||||||||||
sort.Strings(filterGroupByKeys) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
for _, groupByKey := range groupByKeys { | ||||||||||||||||||||||||||||||||||||||||||||||
// Where clauses | ||||||||||||||||||||||||||||||||||||||||||||||
for _, groupByKey := range filterGroupByKeys { | ||||||||||||||||||||||||||||||||||||||||||||||
if _, ok := d.Meter.GroupBy[groupByKey]; !ok { | ||||||||||||||||||||||||||||||||||||||||||||||
return "", nil, fmt.Errorf("meter does not have group by: %s", groupByKey) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -238,38 +260,46 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { | |||||||||||||||||||||||||||||||||||||||||||||
return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
mapFunc := func(value string) string { | ||||||||||||||||||||||||||||||||||||||||||||||
column := fmt.Sprintf("JSON_VALUE(%s, '%s')", getColumn("data"), groupByJSONPath) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Subject is a special case | ||||||||||||||||||||||||||||||||||||||||||||||
if groupByKey == "subject" { | ||||||||||||||||||||||||||||||||||||||||||||||
column = "subject" | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Sprintf("%s = '%s'", column, sqlbuilder.Escape((value))) | ||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Sprintf("JSON_VALUE(%s, '%s') = '%s'", dataColumn, groupByJSONPath, sqlbuilder.Escape((value))) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.Or(slicesx.Map(values, mapFunc)...)) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Apply the time where clause | ||||||||||||||||||||||||||||||||||||||||||||||
from := d.from() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if from != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if d.To != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
query.Where(query.LessThan(timeColumn, d.To.Unix())) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Group by | ||||||||||||||||||||||||||||||||||||||||||||||
query.GroupBy(groupByColumns...) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if groupByWindowSize { | ||||||||||||||||||||||||||||||||||||||||||||||
query.OrderBy("windowstart") | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
sql, args := query.Build() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Only add prewhere if there are filters on JSON data | ||||||||||||||||||||||||||||||||||||||||||||||
if sqlPreWhere != "" { | ||||||||||||||||||||||||||||||||||||||||||||||
sqlParts := strings.Split(sql, sqlPreWhere) | ||||||||||||||||||||||||||||||||||||||||||||||
sqlAfter := sqlParts[1] | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if strings.HasPrefix(sqlAfter, " AND") { | ||||||||||||||||||||||||||||||||||||||||||||||
sqlAfter = strings.Replace(sqlAfter, "AND", "WHERE", 1) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
sqlPreWhere = strings.Replace(sqlPreWhere, "WHERE", "PREWHERE", 1) | ||||||||||||||||||||||||||||||||||||||||||||||
sql = fmt.Sprintf("%s%s", sqlPreWhere, sqlAfter) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Add settings | ||||||||||||||||||||||||||||||||||||||||||||||
settings := []string{ | ||||||||||||||||||||||||||||||||||||||||||||||
"optimize_move_to_prewhere = 1", | ||||||||||||||||||||||||||||||||||||||||||||||
"allow_reorder_prewhere_conditions = 1", | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
for key, value := range d.QuerySettings { | ||||||||||||||||||||||||||||||||||||||||||||||
settings = append(settings, fmt.Sprintf("%s = %s", key, value)) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", ")) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+297
to
+302
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add validation for QuerySettings to prevent SQL injection. The QuerySettings map values are directly interpolated into the SQL query without any validation or escaping, which could lead to SQL injection vulnerabilities if these settings come from untrusted sources. Apply this diff to add basic validation: +// Validate query settings to prevent SQL injection
+for key, value := range d.QuerySettings {
+ // Validate key contains only alphanumeric and underscore
+ if !regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString(key) {
+ return "", nil, fmt.Errorf("invalid query setting key: %s", key)
+ }
+ // Validate value contains only safe characters
+ if !regexp.MustCompile(`^[a-zA-Z0-9_\s,=.]+$`).MatchString(value) {
+ return "", nil, fmt.Errorf("invalid query setting value for %s: %s", key, value)
+ }
+}
// Add settings
settings := []string{
"optimize_move_to_prewhere = 1",
"allow_reorder_prewhere_conditions = 1",
}
for key, value := range d.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
} Alternatively, consider using a whitelist of allowed ClickHouse settings. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||
return sql, args, nil | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor the fragile string manipulation for PREWHERE clause.
The current approach using string splitting and replacement is error-prone and could break if the query structure changes. This makes the code difficult to maintain.
Consider using a more robust approach:
Example approach:
🤖 Prompt for AI Agents