Skip to content

Commit 623a9b6

Browse files
committed
Fix query construction
`sqlutil.INList` repeats the first value of the list and leaves the last value off the resulting string. Fixed this bug and replaced uses of unescaped `IN` list creation with calls to `sqlutil.INList` where appropriate. Adjusted metrics and heartbeat to use driver-level parameter interpolation instead of custom escaping, deprecating several function in `sqlutil`. Removed the use of `sqlutil.INList` in existing blip code but marked as deprecated.
1 parent 62c65ca commit 623a9b6

File tree

20 files changed

+285
-108
lines changed

20 files changed

+285
-108
lines changed

dbconn/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (f factory) Make(cfg blip.ConfigMonitor) (*sql.DB, string, error) {
116116
// ----------------------------------------------------------------------
117117
// Load TLS
118118

119-
params := []string{"parseTime=true"}
119+
params := []string{"parseTime=true", "interpolateParams=true"}
120120

121121
// Go says "either ServerName or InsecureSkipVerify must be specified".
122122
// This is a pathological case: socket and TLS but no hostname to verify

dbconn/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func TestConnect(t *testing.T) {
5454
defer db.Close()
5555

5656
// Make returns a print-safe DSN: password ("test") replaced with "..."
57-
expectDSN := fmt.Sprintf("%s:...@tcp(%s)/?parseTime=true", cfg.Username, cfg.Hostname)
57+
expectDSN := fmt.Sprintf("%s:...@tcp(%s)/?interpolateParams=true&parseTime=true", cfg.Username, cfg.Hostname)
5858
if dsn != expectDSN {
5959
t.Errorf("got DSN '%s', expected '%s'", dsn, expectDSN)
6060
}

heartbeat/reader.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type BlipReader struct {
5656
isRepl bool
5757
event event.MonitorReceiver
5858
query string
59+
params []interface{}
5960
}
6061

6162
type BlipReaderArgs struct {
@@ -91,13 +92,16 @@ func NewBlipReader(args BlipReaderArgs) *BlipReader {
9192
var where string
9293
if r.srcId != "" {
9394
blip.Debug("%s: heartbeat from source %s", r.monitorId, r.srcId)
94-
where = "WHERE src_id='" + r.srcId + "'" // default
95+
where = "WHERE src_id=?" // default
96+
r.params = []interface{}{r.srcId}
9597
} else if r.srcRole != "" {
9698
blip.Debug("%s: heartbeat from role %s", r.monitorId, r.srcRole)
97-
where = "WHERE src_role='" + r.srcRole + "' ORDER BY ts DESC LIMIT 1"
99+
where = "WHERE src_role=? ORDER BY ts DESC LIMIT 1"
100+
r.params = []interface{}{r.srcRole}
98101
} else {
99102
blip.Debug("%s: heartbeat from latest (max ts)", r.monitorId)
100-
where = "WHERE src_id != '" + args.MonitorId + "' ORDER BY ts DESC LIMIT 1"
103+
where = "WHERE src_id != ? ORDER BY ts DESC LIMIT 1"
104+
r.params = []interface{}{args.MonitorId}
101105
}
102106
if r.replCheck != "" {
103107
cols[4] = "@@" + r.replCheck
@@ -136,7 +140,7 @@ func (r *BlipReader) run() {
136140
}
137141

138142
ctx, cancel = context.WithTimeout(context.Background(), ReadTimeout)
139-
err = r.db.QueryRowContext(ctx, r.query).Scan(&now, &last, &freq, &srcId, &isRepl)
143+
err = r.db.QueryRowContext(ctx, r.query, r.params...).Scan(&now, &last, &freq, &srcId, &isRepl)
140144
cancel()
141145
if err != nil {
142146
blip.Debug("%s: %v", r.monitorId, err)

heartbeat/writer.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,19 @@ func (w *Writer) Write(stopChan, doneChan chan struct{}) error {
8484
// This must be done else the simpler UPDATE statements below, which is the
8585
// real heartbeat, will fail because there's no match row.
8686
var ping string
87+
var params []interface{}
8788
if w.srcRole != "" {
88-
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES ('%s', '%s', NOW(3), %d) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=%d, src_role='%s'",
89-
w.table, w.srcId, w.srcRole, w.freq.Milliseconds(), w.freq.Milliseconds(), w.srcRole)
89+
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES (?, ?, NOW(3), ?) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=?, src_role=?", w.table)
90+
params = []interface{}{w.srcId, w.srcRole, w.freq.Milliseconds(), w.freq.Milliseconds(), w.srcRole}
9091
} else {
91-
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES ('%s', NULL, NOW(3), %d) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=%d, src_role=NULL",
92-
w.table, w.monitorId, w.freq.Milliseconds(), w.freq.Milliseconds())
92+
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES (?, NULL, NOW(3), ?) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=?, src_role=NULL", w.table)
93+
params = []interface{}{w.monitorId, w.freq.Milliseconds(), w.freq.Milliseconds()}
9394
}
9495
blip.Debug("%s: first heartbeat: %s", w.monitorId, ping)
9596
for {
9697
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "first insert")
9798
ctx, cancel = context.WithTimeout(context.Background(), WriteTimeout)
98-
_, err = w.db.ExecContext(ctx, ping)
99+
_, err = w.db.ExecContext(ctx, ping, params...)
99100
cancel()
100101
if err == nil { // success
101102
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "sleep")
@@ -127,14 +128,14 @@ func (w *Writer) Write(stopChan, doneChan chan struct{}) error {
127128
// to void 2 wasted round trips: prep (waste), exec, close (waste).
128129
// This risk of SQL injection is miniscule because both table and monitorId
129130
// are sanitized, and Blip should only have write privs on its heartbeat table.
130-
ping = fmt.Sprintf("UPDATE %s SET ts=NOW(3) WHERE src_id='%s'", w.table, w.srcId)
131+
ping = fmt.Sprintf("UPDATE %s SET ts=NOW(3) WHERE src_id=?", w.table)
131132
blip.Debug("%s: heartbeat: %s", w.monitorId, ping)
132133
for {
133134
time.Sleep(w.freq)
134135

135136
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "write")
136137
ctx, cancel = context.WithTimeout(context.Background(), WriteTimeout)
137-
_, err = w.db.ExecContext(ctx, ping)
138+
_, err = w.db.ExecContext(ctx, ping, w.srcId)
138139
cancel()
139140
if err != nil {
140141
blip.Debug("%s: %s", w.monitorId, err.Error())

metrics/innodb/innodb.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,18 @@ const (
4242
// InnoDB collects metrics for the innodb domain. The source is
4343
// information_schema.innodb_metrics.
4444
type InnoDB struct {
45-
db *sql.DB
46-
query map[string]string
45+
db *sql.DB
46+
query map[string]string
47+
params map[string][]interface{}
4748
}
4849

4950
var _ blip.Collector = &InnoDB{}
5051

5152
func NewInnoDB(db *sql.DB) *InnoDB {
5253
return &InnoDB{
53-
db: db,
54-
query: map[string]string{},
54+
db: db,
55+
query: map[string]string{},
56+
params: map[string][]interface{}{},
5557
}
5658
}
5759

@@ -95,18 +97,21 @@ LEVEL:
9597
switch all {
9698
case "all":
9799
c.query[level.Name] = baseQuery
100+
c.params[level.Name] = []interface{}{}
98101
case "enabled":
99102
c.query[level.Name] = baseQuery + " WHERE status='enabled'"
103+
c.params[level.Name] = []interface{}{}
100104
default:
101-
c.query[level.Name] = baseQuery + " WHERE name IN (" + sqlutil.INList(dom.Metrics, "'") + ")"
105+
c.query[level.Name] = baseQuery + " WHERE name IN (" + sqlutil.PlaceholderList(len(dom.Metrics)) + ")"
106+
c.params[level.Name] = sqlutil.ToInterfaceArray(dom.Metrics)
102107
}
103108
blip.Debug("%s: innodb metrics at %s: %s", plan.MonitorId, level.Name, c.query[level.Name])
104109
}
105110
return nil, nil
106111
}
107112

108113
func (c *InnoDB) Collect(ctx context.Context, levelName string) ([]blip.MetricValue, error) {
109-
rows, err := c.db.QueryContext(ctx, c.query[levelName])
114+
rows, err := c.db.QueryContext(ctx, c.query[levelName], c.params[levelName]...)
110115
if err != nil {
111116
return nil, err
112117
}

metrics/repl.lag/lag.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/cashapp/blip"
1313
"github.com/cashapp/blip/heartbeat"
14-
"github.com/cashapp/blip/sqlutil"
1514
)
1615

1716
const (
@@ -207,12 +206,28 @@ LEVEL:
207206

208207
c.dropNotAReplica[levelName] = !blip.Bool(dom.Options[OPT_REPORT_NOT_A_REPLICA])
209208
c.defaultChannelNameOverrides[levelName] = dom.Options[OPT_DEFAULT_CHANNEL_NAME]
210-
c.replCheck = sqlutil.CleanObjectName(dom.Options[OPT_REPL_CHECK]) // @todo sanitize better
209+
if err := c.verifyReplCheck(ctx, dom.Options[OPT_REPL_CHECK]); err != nil {
210+
return cleanup, err
211+
}
212+
c.replCheck = dom.Options[OPT_REPL_CHECK]
211213
}
212214

213215
return cleanup, nil
214216
}
215217

218+
func (c *Lag) verifyReplCheck(ctx context.Context, variable string) error {
219+
if variable == "" {
220+
return nil
221+
}
222+
223+
row := c.db.QueryRowContext(ctx, "SHOW GLOBAL VARIABLES LIKE ?", variable)
224+
var value string
225+
if err := row.Scan(&value, &value); err != nil {
226+
return fmt.Errorf("failed to verify replication check variable %s: %s", variable, err)
227+
}
228+
return nil
229+
}
230+
216231
func (c *Lag) Collect(ctx context.Context, levelName string) ([]blip.MetricValue, error) {
217232
switch c.lagWriterIn[levelName] {
218233
case LAG_WRITER_BLIP:

metrics/size.database/database.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ const (
2424
type Database struct {
2525
db *sql.DB
2626
// --
27-
query map[string]string // keyed on level
28-
total map[string]bool // keyed on level
27+
query map[string]string // keyed on level
28+
params map[string][]interface{}
29+
total map[string]bool // keyed on level
2930
}
3031

3132
var _ blip.Collector = &Database{}
3233

3334
func NewDatabase(db *sql.DB) *Database {
3435
return &Database{
35-
db: db,
36-
query: map[string]string{},
37-
total: map[string]bool{},
36+
db: db,
37+
query: map[string]string{},
38+
params: map[string][]interface{}{},
39+
total: map[string]bool{},
3840
}
3941
}
4042

@@ -97,11 +99,12 @@ LEVEL:
9799
if !ok {
98100
continue LEVEL // not collected in this level
99101
}
100-
q, err := DataSizeQuery(dom.Options, c.Help())
102+
q, p, err := DataSizeQuery(dom.Options, c.Help())
101103
if err != nil {
102104
return nil, err
103105
}
104106
c.query[level.Name] = q
107+
c.params[level.Name] = p
105108

106109
if dom.Options[OPT_TOTAL] == "yes" {
107110
c.total[level.Name] = true
@@ -116,7 +119,7 @@ func (c *Database) Collect(ctx context.Context, levelName string) ([]blip.Metric
116119
return nil, nil // not collected in this level
117120
}
118121

119-
rows, err := c.db.QueryContext(ctx, q)
122+
rows, err := c.db.QueryContext(ctx, q, c.params[levelName]...)
120123
if err != nil {
121124
return nil, err
122125
}

metrics/size.database/query.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/cashapp/blip/sqlutil"
1111
)
1212

13-
func DataSizeQuery(set map[string]string, def blip.CollectorHelp) (string, error) {
13+
func DataSizeQuery(set map[string]string, def blip.CollectorHelp) (string, []interface{}, error) {
1414
cols := ""
1515
groupBy := ""
1616
if val := set[OPT_TOTAL]; val == "only" {
@@ -26,32 +26,42 @@ func DataSizeQuery(set map[string]string, def blip.CollectorHelp) (string, error
2626
like = true
2727
}
2828

29+
var params []interface{}
30+
2931
where := ""
3032
if include := set[OPT_INCLUDE]; include != "" {
31-
o := sqlutil.ObjectList(include, "'")
33+
o := strings.Split(include, ",")
34+
params = make([]interface{}, 0, len(o))
35+
3236
if like {
3337
for i := range o {
34-
o[i] = "table_schema LIKE " + o[i]
38+
params = append(params, o[i])
39+
o[i] = "table_schema LIKE ?"
3540
}
3641
where = strings.Join(o, " OR ")
3742
} else {
38-
where = fmt.Sprintf("table_schema IN (%s)", strings.Join(o, ","))
43+
where = fmt.Sprintf("table_schema IN (%s)", sqlutil.PlaceholderList(len(o)))
44+
params = sqlutil.ToInterfaceArray(o)
3945
}
4046
} else {
4147
exclude := set[OPT_EXCLUDE]
4248
if exclude == "" {
4349
exclude = def.Options[OPT_EXCLUDE].Default
4450
}
45-
o := sqlutil.ObjectList(exclude, "'")
51+
o := strings.Split(exclude, ",")
52+
params = make([]interface{}, 0, len(o))
53+
4654
if like {
4755
for i := range o {
48-
o[i] = "table_schema NOT LIKE " + o[i]
56+
params = append(params, o[i])
57+
o[i] = "table_schema NOT LIKE ?"
4958
}
5059
where = strings.Join(o, " AND ")
5160
} else {
52-
where = fmt.Sprintf("table_schema NOT IN (%s)", strings.Join(o, ","))
61+
where = fmt.Sprintf("table_schema NOT IN (%s)", sqlutil.PlaceholderList(len(o)))
62+
params = sqlutil.ToInterfaceArray(o)
5363
}
5464
}
5565

56-
return "SELECT " + cols + " FROM information_schema.tables WHERE " + where + groupBy, nil
66+
return "SELECT " + cols + " FROM information_schema.tables WHERE " + where + groupBy, params, nil
5767
}

0 commit comments

Comments
 (0)