Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dagger/versions_pinned.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

const (
kafkaVersion = "3.6"
clickhouseVersion = "24.5.5.78"
clickhouseVersion = "24.10"
redisVersion = "7.0.12"
postgresVersion = "14.9"
svixVersion = "v1.44"
Expand Down
1 change: 1 addition & 0 deletions app/common/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewStreamingConnector(
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
MeterQuerySettings: conf.MeterQuerySettings,
ProgressManager: progressmanager,
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type AggregationConfiguration struct {
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// MeterQuerySettings is the settings for the meter query
// For example, you can set the `enable_parallel_replicas` and `max_parallel_replicas` settings.
MeterQuerySettings map[string]string
}

// Validate validates the configuration.
Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/openmeter/templates/clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.3
image: clickhouse/clickhouse-server:24.10
volumeMounts:
- name: data-storage-vc-template
mountPath: /var/lib/clickhouse
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
retries: 30

clickhouse:
image: clickhouse/clickhouse-server:24.9-alpine
image: clickhouse/clickhouse-server:24.10-alpine
ports:
- "127.0.0.1:8123:8123"
- "127.0.0.1:9000:9000"
Expand Down
2 changes: 1 addition & 1 deletion examples/collectors/database/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
clickhouse:
profiles:
- clickhouse
image: clickhouse/clickhouse-server:23.8.9.54-alpine
image: clickhouse/clickhouse-server:24.0-alpine
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000
Expand Down
2 changes: 2 additions & 0 deletions openmeter/streaming/clickhouse/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
AsyncInsert bool
AsyncInsertWait bool
InsertQuerySettings map[string]string
MeterQuerySettings map[string]string
ProgressManager progressmanager.Service
SkipCreateTables bool
}
Expand Down Expand Up @@ -169,6 +170,7 @@ func (c *Connector) QueryMeter(ctx context.Context, namespace string, meter mete
GroupBy: groupBy,
WindowSize: params.WindowSize,
WindowTimeZone: params.WindowTimeZone,
QuerySettings: c.config.MeterQuerySettings,
}

// Load cached rows if any
Expand Down
80 changes: 55 additions & 25 deletions openmeter/streaming/clickhouse/meter_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -26,6 +27,7 @@ type queryMeter struct {
GroupBy []string
WindowSize *meterpkg.WindowSize
WindowTimeZone *time.Location
QuerySettings map[string]string
}

// from returns the from time for the query.
Expand Down Expand Up @@ -150,7 +152,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
// TODO: remove this when we don't round to the nearest minute anymore
// We round them to the nearest minute to ensure the result is the same as with
// streaming connector using materialized views with per minute windows
selectColumn := fmt.Sprintf("tumbleStart(min(%s), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(%s), toIntervalMinute(1)) AS windowend", timeColumn, timeColumn)
selectColumn := fmt.Sprintf("toStartOfMinute(min(%s)) AS windowstart, toStartOfMinute(max(%s)) + INTERVAL 1 MINUTE AS windowend", timeColumn, timeColumn)
selectColumns = append(selectColumns, selectColumn)
}

Expand Down Expand Up @@ -207,6 +209,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
query := sqlbuilder.ClickHouse.NewSelectBuilder()
query.Select(selectColumns...)
query.From(tableName)

// Prewhere clauses

query.Where(query.Equal(getColumn("namespace"), d.Namespace))
query.Where(query.Equal(getColumn("type"), d.Meter.EventType))

Expand All @@ -218,15 +223,32 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
query.Where(query.Or(slicesx.Map(d.Subject, mapFunc)...))
}

// Apply the time where clause
from := d.from()

if from != nil {
query.Where(query.GreaterEqualThan(timeColumn, from.Unix()))
}

if d.To != nil {
query.Where(query.LessThan(timeColumn, d.To.Unix()))
}

var sqlPreWhere string

if len(d.FilterGroupBy) > 0 {
// We sort the group by s to ensure the query is deterministic
groupByKeys := make([]string, 0, len(d.FilterGroupBy))
sqlPreWhere, _ = query.Build()
dataColumn := getColumn("data")

// We sort the group bys to ensure the query is deterministic
filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy))
for k := range d.FilterGroupBy {
groupByKeys = append(groupByKeys, k)
filterGroupByKeys = append(filterGroupByKeys, k)
}
sort.Strings(groupByKeys)
sort.Strings(filterGroupByKeys)

for _, groupByKey := range groupByKeys {
// Where clauses
for _, groupByKey := range filterGroupByKeys {
if _, ok := d.Meter.GroupBy[groupByKey]; !ok {
return "", nil, fmt.Errorf("meter does not have group by: %s", groupByKey)
}
Expand All @@ -238,38 +260,46 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey)
}
mapFunc := func(value string) string {
column := fmt.Sprintf("JSON_VALUE(%s, '%s')", getColumn("data"), groupByJSONPath)

// Subject is a special case
if groupByKey == "subject" {
column = "subject"
}

return fmt.Sprintf("%s = '%s'", column, sqlbuilder.Escape((value)))
return fmt.Sprintf("JSON_VALUE(%s, '%s') = '%s'", dataColumn, groupByJSONPath, sqlbuilder.Escape((value)))
}

query.Where(query.Or(slicesx.Map(values, mapFunc)...))
}
}

// Apply the time where clause
from := d.from()

if from != nil {
query.Where(query.GreaterEqualThan(timeColumn, from.Unix()))
}

if d.To != nil {
query.Where(query.LessThan(timeColumn, d.To.Unix()))
}

// Group by
query.GroupBy(groupByColumns...)

if groupByWindowSize {
query.OrderBy("windowstart")
}

sql, args := query.Build()

// Only add prewhere if there are filters on JSON data
if sqlPreWhere != "" {
sqlParts := strings.Split(sql, sqlPreWhere)
sqlAfter := sqlParts[1]

if strings.HasPrefix(sqlAfter, " AND") {
sqlAfter = strings.Replace(sqlAfter, "AND", "WHERE", 1)
}

sqlPreWhere = strings.Replace(sqlPreWhere, "WHERE", "PREWHERE", 1)
sql = fmt.Sprintf("%s%s", sqlPreWhere, sqlAfter)
}

Comment on lines +279 to +291
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Refactor the fragile string manipulation for PREWHERE clause.

The current approach using string splitting and replacement is error-prone and could break if the query structure changes. This makes the code difficult to maintain.

Consider using a more robust approach:

  1. Build PREWHERE and WHERE clauses separately using the query builder
  2. Use a custom SQL builder that supports PREWHERE natively
  3. Or at minimum, use more robust string manipulation with proper parsing

Example approach:

// Build PREWHERE conditions separately
prewhereBuilder := sqlbuilder.ClickHouse.NewSelectBuilder()
prewhereConditions := []string{
    prewhereBuilder.Equal(getColumn("namespace"), d.Namespace),
    prewhereBuilder.Equal(getColumn("type"), d.Meter.EventType),
}

// Build WHERE conditions for filters
whereConditions := []string{}
// ... add filter conditions to whereConditions

// Construct the final query with both PREWHERE and WHERE
🤖 Prompt for AI Agents
In openmeter/streaming/clickhouse/meter_query.go around lines 279 to 291, the
current string manipulation to insert the PREWHERE clause by splitting and
replacing parts of the SQL query is fragile and error-prone. To fix this,
refactor the code to build PREWHERE and WHERE clauses separately using a query
builder that supports PREWHERE natively or at least construct the conditions as
separate lists before combining them. Avoid direct string splitting and
replacements; instead, assemble the final query by explicitly adding PREWHERE
conditions first and then WHERE conditions to ensure correctness and
maintainability.

// Add settings
settings := []string{
"optimize_move_to_prewhere = 1",
"allow_reorder_prewhere_conditions = 1",
}
for key, value := range d.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
}

sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", "))

Comment on lines +297 to +302
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for QuerySettings to prevent SQL injection.

The QuerySettings map values are directly interpolated into the SQL query without any validation or escaping, which could lead to SQL injection vulnerabilities if these settings come from untrusted sources.

Apply this diff to add basic validation:

+// Validate query settings to prevent SQL injection
+for key, value := range d.QuerySettings {
+    // Validate key contains only alphanumeric and underscore
+    if !regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString(key) {
+        return "", nil, fmt.Errorf("invalid query setting key: %s", key)
+    }
+    // Validate value contains only safe characters
+    if !regexp.MustCompile(`^[a-zA-Z0-9_\s,=.]+$`).MatchString(value) {
+        return "", nil, fmt.Errorf("invalid query setting value for %s: %s", key, value)
+    }
+}

 // Add settings
 settings := []string{
     "optimize_move_to_prewhere = 1",
     "allow_reorder_prewhere_conditions = 1",
 }
 for key, value := range d.QuerySettings {
     settings = append(settings, fmt.Sprintf("%s = %s", key, value))
 }

Alternatively, consider using a whitelist of allowed ClickHouse settings.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key, value := range d.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
}
sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", "))
// Validate query settings to prevent SQL injection
for key, value := range d.QuerySettings {
// Validate key contains only alphanumeric and underscore
if !regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString(key) {
return "", nil, fmt.Errorf("invalid query setting key: %s", key)
}
// Validate value contains only safe characters
if !regexp.MustCompile(`^[a-zA-Z0-9_\s,=.]+$`).MatchString(value) {
return "", nil, fmt.Errorf("invalid query setting value for %s: %s", key, value)
}
}
for key, value := range d.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
}
sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", "))
🤖 Prompt for AI Agents
In openmeter/streaming/clickhouse/meter_query.go around lines 297 to 302, the
QuerySettings map values are directly inserted into the SQL query without
validation, risking SQL injection. To fix this, add validation to ensure each
key and value conforms to expected patterns or use a whitelist of allowed
settings before appending them to the query. Reject or sanitize any entries that
do not meet these criteria to prevent injection vulnerabilities.

return sql, args, nil
}

Expand Down
Loading
Loading