Skip to content

Commit 83345d9

Browse files
committed
Merge branch 'master' into delete-dropped-tables-from-listing-table
2 parents 068a53f + 81b1e27 commit 83345d9

File tree

3 files changed

+70
-79
lines changed

3 files changed

+70
-79
lines changed

internal/sinks/cmdopts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ type CmdOpts struct {
77
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
88
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
99
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink time partitions. Must be a valid PostgreSQL interval." default:"1 week" env:"PW_PARTITION_INTERVAL"`
10-
Retention string `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
10+
RetentionInterval string `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
1111
MaintenanceInterval string `long:"maintenance-interval" mapstructure:"maintenance-interval" description:"Run pgwatch maintenance tasks on sinks with this interval; Set to zero to disable. Must be a valid PostgreSQL interval." default:"12 hours" env:"PW_MAINTENANCE_INTERVAL"`
1212
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
1313
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`

internal/sinks/postgres.go

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,17 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
5959
sinkDb: conn,
6060
}
6161
if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
62-
var isValidPartitionInterval bool
63-
if err = conn.QueryRow(ctx,
64-
"SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
65-
opts.Retention, opts.MaintenanceInterval, opts.PartitionInterval,
62+
var isValidPartitionInterval bool
63+
if err = conn.QueryRow(ctx,
64+
"SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
65+
opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval,
6666
).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
6767
return err
6868
}
6969

70-
// multiply by (10 ^ 9) because epoch returns seconds
71-
// but time.Duration works in terms of nanoseconds
72-
pgw.retentionInterval *= 1_000_000_000
73-
pgw.maintenanceInterval *= 1_000_000_000
70+
// epoch returns seconds but time.Duration represents nanoseconds
71+
pgw.retentionInterval *= time.Second
72+
pgw.maintenanceInterval *= time.Second
7473

7574
if !isValidPartitionInterval {
7675
return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval)
@@ -103,31 +102,8 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
103102
return
104103
}
105104

106-
if pgw.maintenanceInterval > 0 {
107-
go func() {
108-
for {
109-
select {
110-
case <-pgw.ctx.Done():
111-
return
112-
case <-time.After(pgw.maintenanceInterval):
113-
pgw.MaintainUniqueSources()
114-
}
115-
}
116-
}()
117-
}
118-
119-
if pgw.retentionInterval > 0 {
120-
go func() {
121-
for {
122-
select {
123-
case <-pgw.ctx.Done():
124-
return
125-
case <-time.After(pgw.retentionInterval):
126-
pgw.DeleteOldPartitions()
127-
}
128-
}
129-
}()
130-
}
105+
pgw.scheduleJob(pgw.maintenanceInterval, pgw.MaintainUniqueSources)
106+
pgw.scheduleJob(pgw.retentionInterval, pgw.DeleteOldPartitions)
131107

132108
go pgw.poll()
133109
l.Info(`measurements sink is activated`)
@@ -183,6 +159,21 @@ const (
183159
DbStorageSchemaTimescale
184160
)
185161

162+
func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
163+
if interval > 0 {
164+
go func() {
165+
for {
166+
select {
167+
case <-pgw.ctx.Done():
168+
return
169+
case <-time.After(interval):
170+
job()
171+
}
172+
}
173+
}()
174+
}
175+
}
176+
186177
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
187178
var isTs bool
188179
pgw.metricSchema = DbStorageSchemaPostgres
@@ -504,15 +495,15 @@ func (pgw *PostgresWriter) DeleteOldPartitions() {
504495
l := log.GetLogger(pgw.ctx)
505496
var partsDropped int
506497
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
507-
pgw.opts.Retention).Scan(&partsDropped)
498+
pgw.opts.RetentionInterval).Scan(&partsDropped)
508499
if err != nil {
509500
l.Error("Could not drop old time partitions:", err)
510501
} else if partsDropped > 0 {
511502
l.Infof("Dropped %d old time partitions", partsDropped)
512503
}
513504
}
514505

515-
// MaintainUniqueSources is a background task that maintains a mapping of unique sources
506+
// MaintainUniqueSources is a background task that maintains a mapping of unique sources
516507
// in each metric table in admin.all_distinct_dbname_metrics.
517508
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
518509
func (pgw *PostgresWriter) MaintainUniqueSources() {

internal/sinks/postgres_test.go

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
4545

4646
conn.ExpectPing()
4747
conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows(
48-
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow(24 * time.Hour / 1_000_000_000, 24 * time.Hour / 1_000_000_000, true),
48+
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
4949
)
5050
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
5151
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
@@ -54,10 +54,10 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
5454
}
5555

5656
opts := &CmdOpts{
57-
BatchingDelay: time.Hour,
58-
Retention: "1 day",
57+
BatchingDelay: time.Hour,
58+
RetentionInterval: "1 day",
5959
MaintenanceInterval: "1 day",
60-
PartitionInterval: "1 hour",
60+
PartitionInterval: "1 hour",
6161
}
6262
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
6363
assert.NoError(t, err)
@@ -508,7 +508,7 @@ func TestCopyFromMeasurements_CopyFail(t *testing.T) {
508508

509509
}
510510

511-
// tests interval string validation for all
511+
// tests interval string validation for all
512512
// cli flags that expect a PostgreSQL interval string
513513
func TestIntervalValidation(t *testing.T) {
514514
a := assert.New(t)
@@ -529,10 +529,10 @@ func TestIntervalValidation(t *testing.T) {
529529
connStr, _ := pgContainer.ConnectionString(ctx, "sslmode=disable")
530530

531531
opts := &CmdOpts{
532-
PartitionInterval: "1 minute",
532+
PartitionInterval: "1 minute",
533533
MaintenanceInterval: "-1 hours",
534-
Retention: "-1 hours",
535-
BatchingDelay: time.Second,
534+
RetentionInterval: "-1 hours",
535+
BatchingDelay: time.Second,
536536
}
537537

538538
_, err = NewPostgresWriter(ctx, connStr, opts)
@@ -546,7 +546,7 @@ func TestIntervalValidation(t *testing.T) {
546546
_, err = NewPostgresWriter(ctx, connStr, opts)
547547
a.EqualError(err, "--retention must be a positive PostgreSQL interval or 0 to disable it")
548548

549-
invalidIntervals := []string {
549+
invalidIntervals := []string{
550550
"not an interval", "3 dayss",
551551
"four hours",
552552
}
@@ -562,10 +562,10 @@ func TestIntervalValidation(t *testing.T) {
562562
a.Error(err)
563563
opts.MaintenanceInterval = "1 hour"
564564

565-
opts.Retention = interval
565+
opts.RetentionInterval = interval
566566
_, err = NewPostgresWriter(ctx, connStr, opts)
567567
a.Error(err)
568-
opts.Retention = "1 hour"
568+
opts.RetentionInterval = "1 hour"
569569
}
570570

571571
validIntervals := []string{
@@ -577,7 +577,7 @@ func TestIntervalValidation(t *testing.T) {
577577
for _, interval := range validIntervals {
578578
opts.PartitionInterval = interval
579579
opts.MaintenanceInterval = interval
580-
opts.Retention = interval
580+
opts.RetentionInterval = interval
581581

582582
_, err = NewPostgresWriter(ctx, connStr, opts)
583583
a.NoError(err)
@@ -603,10 +603,10 @@ func TestPartitionInterval(t *testing.T) {
603603
connStr, _ := pgContainer.ConnectionString(ctx, "sslmode=disable")
604604

605605
opts := &CmdOpts{
606-
PartitionInterval: "3 weeks",
607-
Retention: "14 days",
606+
PartitionInterval: "3 weeks",
607+
RetentionInterval: "14 days",
608608
MaintenanceInterval: "12 hours",
609-
BatchingDelay: time.Second,
609+
BatchingDelay: time.Second,
610610
}
611611

612612
pgw, err := NewPostgresWriter(ctx, connStr, opts)
@@ -625,7 +625,7 @@ func TestPartitionInterval(t *testing.T) {
625625
err = pgw.EnsureMetricDbnameTime(m, false)
626626
r.NoError(err)
627627

628-
var partitionsNum int;
628+
var partitionsNum int
629629
err = conn.QueryRow(ctx, "SELECT COUNT(*) FROM pg_partition_tree('test_metric');").Scan(&partitionsNum)
630630
a.NoError(err)
631631
// 1 the metric table itself + 1 dbname partition
@@ -634,7 +634,7 @@ func TestPartitionInterval(t *testing.T) {
634634

635635
part := partitionMapMetricDbname["test_metric"]["test_db"]
636636
// partition bounds should have a difference of 3 weeks
637-
a.Equal(part.StartTime.Add(3 * 7 * 24 * time.Hour), part.EndTime)
637+
a.Equal(part.StartTime.Add(3*7*24*time.Hour), part.EndTime)
638638
}
639639

640640
func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
@@ -659,10 +659,10 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
659659
r.NoError(err)
660660

661661
opts := &CmdOpts{
662-
PartitionInterval: "1 hour",
663-
Retention: "1 second",
662+
PartitionInterval: "1 hour",
663+
RetentionInterval: "1 second",
664664
MaintenanceInterval: "0 days",
665-
BatchingDelay: time.Hour,
665+
BatchingDelay: time.Hour,
666666
}
667667

668668
pgw, err := NewPostgresWriter(ctx, connStr, opts)
@@ -675,15 +675,15 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
675675

676676
var numOfEntries int
677677
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
678-
a.NoError(err)
678+
a.NoError(err)
679679
a.Equal(1, numOfEntries)
680680

681681
// manually call the maintenance routine
682682
pgw.MaintainUniqueSources()
683683

684684
// entry should have been deleted, because it has no corresponding entries in `test_metric_1` table.
685685
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
686-
a.NoError(err)
686+
a.NoError(err)
687687
a.Equal(0, numOfEntries)
688688

689689
message := []metrics.MeasurementEnvelope{
@@ -694,15 +694,15 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
694694
},
695695
DBName: "test_db",
696696
},
697-
}
697+
}
698698
pgw.flush(message)
699699

700700
// manually call the maintenance routine
701701
pgw.MaintainUniqueSources()
702702

703703
// entry should have been added, because there is a corresponding entry in `test_metric_1` table just written.
704704
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
705-
a.NoError(err)
705+
a.NoError(err)
706706
a.Equal(1, numOfEntries)
707707

708708
_, err = conn.Exec(ctx, "DROP TABLE test_metric_1;")
@@ -726,31 +726,31 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
726726
boundStart := time.Now().Add(-1 * 2 * 24 * time.Hour).Format("2006-01-02")
727727
boundEnd := time.Now().Add(-1 * 24 * time.Hour).Format("2006-01-02")
728728

729-
// create the 3rd level time partition with end bound yesterday
730-
_, err = conn.Exec(ctx,
729+
// create the 3rd level time partition with end bound yesterday
730+
_, err = conn.Exec(ctx,
731731
fmt.Sprintf(
732-
`CREATE TABLE subpartitions.test_metric_2_dbname_time
732+
`CREATE TABLE subpartitions.test_metric_2_dbname_time
733733
PARTITION OF subpartitions.test_metric_2_dbname
734-
FOR VALUES FROM ('%s') TO ('%s')`,
735-
boundStart, boundEnd),
734+
FOR VALUES FROM ('%s') TO ('%s')`,
735+
boundStart, boundEnd),
736736
)
737737
a.NoError(err)
738738
_, err = conn.Exec(ctx, "COMMENT ON TABLE subpartitions.test_metric_2_dbname_time IS $$pgwatch-generated-metric-dbname-time-lvl$$")
739739
a.NoError(err)
740740

741-
var partitionsNum int;
741+
var partitionsNum int
742742
err = conn.QueryRow(ctx, "SELECT COUNT(*) FROM pg_partition_tree('test_metric_2');").Scan(&partitionsNum)
743743
a.NoError(err)
744744
a.Equal(3, partitionsNum)
745745

746-
pgw.opts.Retention = "2 days"
746+
pgw.opts.RetentionInterval = "2 days"
747747
pgw.DeleteOldPartitions() // 1 day < 2 days, shouldn't delete anything
748748

749749
err = conn.QueryRow(ctx, "SELECT COUNT(*) FROM pg_partition_tree('test_metric_2');").Scan(&partitionsNum)
750750
a.NoError(err)
751751
a.Equal(3, partitionsNum)
752752

753-
pgw.opts.Retention = "1 hour"
753+
pgw.opts.RetentionInterval = "1 hour"
754754
pgw.DeleteOldPartitions() // 1 day > 1 hour, should delete the partition
755755

756756
err = conn.QueryRow(ctx, "SELECT COUNT(*) FROM pg_partition_tree('test_metric_2');").Scan(&partitionsNum)
@@ -760,23 +760,23 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
760760

761761
t.Run("Epoch to Duration Conversion", func(_ *testing.T) {
762762
table := map[string]time.Duration{
763-
"1 hour": time.Hour,
764-
"2 hours": 2 * time.Hour,
765-
"4 days": 4 * 24 * time.Hour,
766-
"1 day": 24 * time.Hour,
767-
"1 year": 365.25 * 24 * time.Hour,
768-
"1 week": 7 * 24 * time.Hour,
769-
"3 weeks": 3 * 7 * 24 * time.Hour,
763+
"1 hour": time.Hour,
764+
"2 hours": 2 * time.Hour,
765+
"4 days": 4 * 24 * time.Hour,
766+
"1 day": 24 * time.Hour,
767+
"1 year": 365.25 * 24 * time.Hour,
768+
"1 week": 7 * 24 * time.Hour,
769+
"3 weeks": 3 * 7 * 24 * time.Hour,
770770
"2 months": 2 * 30 * 24 * time.Hour,
771-
"1 month": 30 * 24 * time.Hour,
771+
"1 month": 30 * 24 * time.Hour,
772772
}
773773

774774
for k, v := range table {
775775
opts := &CmdOpts{
776-
PartitionInterval: "1 hour",
777-
Retention: k,
776+
PartitionInterval: "1 hour",
777+
RetentionInterval: k,
778778
MaintenanceInterval: k,
779-
BatchingDelay: time.Hour,
779+
BatchingDelay: time.Hour,
780780
}
781781

782782
pgw, err := NewPostgresWriter(ctx, connStr, opts)

0 commit comments

Comments
 (0)