Skip to content

Commit 81b1e27

Browse files
0xgoudapashagolub
andauthored
[+] support custom configs for maintenance tasks (#1018)
* Introduce `--maintenance-interval` flag - modify `--retention` to accept PostgreSQL interval string. * Use new flags to control deletion and maintenance tasks. * Test `MaintainUniqueSources()` and `DeleteOldPartitions()` * update docs reference page to mention new flag. * run `DeleteOldPartitions()` with `retentionInterval` not `maintenanceInterval` * Rename test * Capitalize `set` in flag description. * Fix typo in comment * rename `CmdOpts.Retention` to `RetentionInterval` * add `PostgresWriter.scheduleJob()` function --------- Co-authored-by: Pavlo Golub <pavlo.golub@gmail.com>
1 parent 0a119d1 commit 81b1e27

File tree

4 files changed

+372
-153
lines changed

4 files changed

+372
-153
lines changed

docs/reference/cli_env.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,23 @@ It reads the configuration from the specified sources and metrics, then begins c
9898
ENV: `$PW_BATCHING_DELAY`
9999

100100
- `--partition-interval=`
101-
Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression. (default: 1 week)
101+
102+
Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval. (default: 1 week)
102103
ENV: `$PW_PARTITION_INTERVAL`
103104

104105
Example:
105106
`--partition-inteval="3 weeks 4 days"`,
106107

107108
- `--retention=`
108109

109-
If set, metrics older than that will be deleted (default: "14 days").
110+
Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval. (default: 14 days)
110111
ENV: `$PW_RETENTION`
111112

113+
- `--maintenance-interval=`
114+
115+
Run pgwatch maintenance tasks on sinks with this interval; Set to zero to disable. Must be a valid PostgreSQL interval. (default: 12 hours)
116+
ENV: `$PW_MAINTENANCE_INTERVAL`
117+
112118
- `--real-dbname-field=`
113119

114120
Tag key for real database name (default: real_dbname).

internal/sinks/cmdopts.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import "time"
66
type CmdOpts struct {
77
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
88
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
9-
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression." default:"1 week" env:"PW_PARTITION_INTERVAL"`
10-
Retention string `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14 days" env:"PW_RETENTION"`
9+
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink time partitions. Must be a valid PostgreSQL interval." default:"1 week" env:"PW_PARTITION_INTERVAL"`
10+
RetentionInterval string `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
11+
MaintenanceInterval string `long:"maintenance-interval" mapstructure:"maintenance-interval" description:"Run pgwatch maintenance tasks on sinks with this interval; Set to zero to disable. Must be a valid PostgreSQL interval." default:"12 hours" env:"PW_MAINTENANCE_INTERVAL"`
1112
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
1213
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
1314
}

internal/sinks/postgres.go

Lines changed: 122 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,21 @@ var (
2525
targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
2626
)
2727

28+
// PostgresWriter is a sink that writes metric measurements to a Postgres database.
29+
// At the moment, it supports both Postgres and TimescaleDB as a storage backend.
30+
// However, one is able to use any Postgres-compatible database as a storage backend,
31+
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
32+
type PostgresWriter struct {
33+
ctx context.Context
34+
sinkDb db.PgxPoolIface
35+
metricSchema DbStorageSchemaType
36+
opts *CmdOpts
37+
retentionInterval time.Duration
38+
maintenanceInterval time.Duration
39+
input chan metrics.MeasurementEnvelope
40+
lastError chan error
41+
}
42+
2843
func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
2944
var conn db.PgxPoolIface
3045
if conn, err = db.New(ctx, connstr); err != nil {
@@ -43,18 +58,27 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
4358
lastError: make(chan error),
4459
sinkDb: conn,
4560
}
46-
var runDeleteOldPartitions bool
4761
if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
48-
if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
62+
var isValidPartitionInterval bool
63+
if err = conn.QueryRow(ctx,
64+
"SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
65+
opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval,
66+
).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
4967
return err
5068
}
51-
var isValidInterval bool
52-
err = conn.QueryRow(ctx, "SELECT $1::interval >= '1h'::interval", opts.PartitionInterval).Scan(&isValidInterval)
53-
if err != nil {
54-
return err
69+
70+
// epoch returns seconds but time.Duration represents nanoseconds
71+
pgw.retentionInterval *= time.Second
72+
pgw.maintenanceInterval *= time.Second
73+
74+
if !isValidPartitionInterval {
75+
return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval)
76+
}
77+
if pgw.maintenanceInterval < 0 {
78+
return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it")
5579
}
56-
if !isValidInterval {
57-
return fmt.Errorf("partition interval must be at least 1 hour, got: %s", opts.PartitionInterval)
80+
if pgw.retentionInterval < 0 {
81+
return errors.New("--retention must be a positive PostgreSQL interval or 0 to disable it")
5882
}
5983

6084
l.Info("initialising measurements database...")
@@ -77,10 +101,10 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
77101
if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
78102
return
79103
}
80-
if runDeleteOldPartitions {
81-
go pgw.deleteOldPartitions()
82-
}
83-
go pgw.maintainUniqueSources()
104+
105+
pgw.scheduleJob(pgw.maintenanceInterval, pgw.MaintainUniqueSources)
106+
pgw.scheduleJob(pgw.retentionInterval, pgw.DeleteOldPartitions)
107+
84108
go pgw.poll()
85109
l.Info(`measurements sink is activated`)
86110
return
@@ -115,19 +139,6 @@ var (
115139
}
116140
)
117141

118-
// PostgresWriter is a sink that writes metric measurements to a Postgres database.
119-
// At the moment, it supports both Postgres and TimescaleDB as a storage backend.
120-
// However, one is able to use any Postgres-compatible database as a storage backend,
121-
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
122-
type PostgresWriter struct {
123-
ctx context.Context
124-
sinkDb db.PgxPoolIface
125-
metricSchema DbStorageSchemaType
126-
opts *CmdOpts
127-
input chan metrics.MeasurementEnvelope
128-
lastError chan error
129-
}
130-
131142
type ExistingPartitionInfo struct {
132143
StartTime time.Time
133144
EndTime time.Time
@@ -148,6 +159,21 @@ const (
148159
DbStorageSchemaTimescale
149160
)
150161

162+
func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
163+
if interval > 0 {
164+
go func() {
165+
for {
166+
select {
167+
case <-pgw.ctx.Done():
168+
return
169+
case <-time.After(interval):
170+
job()
171+
}
172+
}
173+
}()
174+
}
175+
}
176+
151177
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
152178
var isTs bool
153179
pgw.metricSchema = DbStorageSchemaPostgres
@@ -464,117 +490,105 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
464490
return nil
465491
}
466492

467-
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
468-
func (pgw *PostgresWriter) deleteOldPartitions() {
493+
// DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
494+
func (pgw *PostgresWriter) DeleteOldPartitions() {
469495
l := log.GetLogger(pgw.ctx)
470-
for {
471-
var partsDropped int
472-
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
473-
pgw.opts.Retention).Scan(&partsDropped)
474-
if err != nil {
475-
l.Error("Could not drop old time partitions:", err)
476-
} else if partsDropped > 0 {
477-
l.Infof("Dropped %d old time partitions", partsDropped)
478-
}
479-
select {
480-
case <-pgw.ctx.Done():
481-
return
482-
case <-time.After(time.Hour * 12):
483-
}
496+
var partsDropped int
497+
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
498+
pgw.opts.RetentionInterval).Scan(&partsDropped)
499+
if err != nil {
500+
l.Error("Could not drop old time partitions:", err)
501+
} else if partsDropped > 0 {
502+
l.Infof("Dropped %d old time partitions", partsDropped)
484503
}
485504
}
486505

487-
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
506+
// MaintainUniqueSources is a background task that maintains a mapping of unique sources
507+
// in each metric table in admin.all_distinct_dbname_metrics.
488508
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
489-
func (pgw *PostgresWriter) maintainUniqueSources() {
509+
func (pgw *PostgresWriter) MaintainUniqueSources() {
490510
logger := log.GetLogger(pgw.ctx)
491-
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
511+
492512
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
493513
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
494514
sqlDistinct := `
495515
WITH RECURSIVE t(dbname) AS (
496516
SELECT MIN(dbname) AS dbname FROM %s
497517
UNION
498-
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
518+
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
519+
)
499520
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
500521
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
501522
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
502523
sqlAdd := `
503-
INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
524+
INSERT INTO admin.all_distinct_dbname_metrics
525+
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
504526
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
505527
RETURNING *`
506528

507-
for {
508-
select {
509-
case <-pgw.ctx.Done():
510-
return
511-
case <-time.After(time.Hour * 24):
512-
}
513-
var lock bool
514-
logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
515-
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
516-
logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
517-
continue
518-
}
519-
if !lock {
520-
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
521-
continue
522-
}
529+
var lock bool
530+
logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
531+
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
532+
logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
533+
return
534+
}
535+
if !lock {
536+
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
537+
return
538+
}
523539

524-
logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
525-
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
526-
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
527-
if err != nil {
528-
logger.Error(err)
529-
continue
530-
}
540+
logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
541+
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
542+
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
543+
if err != nil {
544+
logger.Error(err)
545+
return
546+
}
531547

532-
for _, tableName := range allDistinctMetricTables {
533-
foundDbnamesMap := make(map[string]bool)
534-
foundDbnamesArr := make([]string, 0)
535-
metricName := strings.Replace(tableName, "public.", "", 1)
548+
for _, tableName := range allDistinctMetricTables {
549+
foundDbnamesMap := make(map[string]bool)
550+
foundDbnamesArr := make([]string, 0)
551+
metricName := strings.Replace(tableName, "public.", "", 1)
536552

537-
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
538-
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
539-
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
540-
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
541-
if err != nil {
542-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
543-
break
544-
}
545-
for _, drDbname := range ret {
546-
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
547-
}
553+
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
554+
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
555+
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
556+
if err != nil {
557+
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
558+
break
559+
}
560+
for _, drDbname := range ret {
561+
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
562+
}
548563

549-
// delete all that are not known and add all that are not there
550-
for k := range foundDbnamesMap {
551-
foundDbnamesArr = append(foundDbnamesArr, k)
552-
}
553-
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
554-
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
564+
// delete all that are not known and add all that are not there
565+
for k := range foundDbnamesMap {
566+
foundDbnamesArr = append(foundDbnamesArr, k)
567+
}
568+
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
569+
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
555570

556-
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
557-
if err != nil {
558-
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
559-
}
560-
continue
561-
}
562-
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
571+
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
563572
if err != nil {
564-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
565-
} else if cmdTag.RowsAffected() > 0 {
566-
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
573+
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
567574
}
568-
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
569-
if err != nil {
570-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
571-
} else if cmdTag.RowsAffected() > 0 {
572-
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
573-
}
574-
time.Sleep(time.Minute)
575+
continue
575576
}
576-
577+
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
578+
if err != nil {
579+
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
580+
} else if cmdTag.RowsAffected() > 0 {
581+
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
582+
}
583+
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
584+
if err != nil {
585+
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
586+
} else if cmdTag.RowsAffected() > 0 {
587+
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
588+
}
589+
time.Sleep(time.Minute)
577590
}
591+
578592
}
579593

580594
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {

0 commit comments

Comments
 (0)