Skip to content

Commit 7bc069d

Browse files
authored
[*] fix maintenance task to delete entries of dropped tables (#1020)
* introduce `--maintenance-interval` flag * modify `--retention` to accept PostgreSQL interval string. * Use new flags to control deletion and maintenance tasks. * Test `MaintainUniqueSources()` and `DeleteOldPartitions()` * update docs reference page to mention new flag. * run `DeleteOldPartitions()` with `retentionInterval` not `maintenanceInterval` * Rename test * Capitalize `set` in flag description. * Fix typo in comment * Update maintenance task to remove entries of dropped metric tables from listing table. * Test removing entries of dropped tables from the listing table. * Add empty line at end of file * Ensure there is no error dropping table * Use `!=` operator instead of `NOT ... = ...` * Preserve `public.` prefix in `sqlDistinct` query * substitue `break` with `continue` to avoid breaking the loop without trimming all elements in `allDistinctMetricTables` * this is more informative in the logs also.
1 parent a2d1faa commit 7bc069d

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

internal/sinks/postgres.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ func (pgw *PostgresWriter) MaintainUniqueSources() {
520520
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
521521
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
522522
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
523+
sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
523524
sqlAdd := `
524525
INSERT INTO admin.all_distinct_dbname_metrics
525526
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
@@ -545,17 +546,20 @@ func (pgw *PostgresWriter) MaintainUniqueSources() {
545546
return
546547
}
547548

548-
for _, tableName := range allDistinctMetricTables {
549+
for i, tableName := range allDistinctMetricTables {
549550
foundDbnamesMap := make(map[string]bool)
550551
foundDbnamesArr := make([]string, 0)
552+
551553
metricName := strings.Replace(tableName, "public.", "", 1)
554+
// later usage in sqlDroppedTables requires no "public." prefix
555+
allDistinctMetricTables[i] = metricName
552556

553557
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
554558
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
555559
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
556560
if err != nil {
557-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
558-
break
561+
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
562+
continue
559563
}
560564
for _, drDbname := range ret {
561565
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
@@ -589,6 +593,12 @@ func (pgw *PostgresWriter) MaintainUniqueSources() {
589593
time.Sleep(time.Minute)
590594
}
591595

596+
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
597+
if err != nil {
598+
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
599+
} else if cmdTag.RowsAffected() > 0 {
600+
logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
601+
}
592602
}
593603

594604
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {

internal/sinks/postgres_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,15 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
704704
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
705705
a.NoError(err)
706706
a.Equal(1, numOfEntries)
707+
708+
_, err = conn.Exec(ctx, "DROP TABLE test_metric_1;")
709+
r.NoError(err)
710+
711+
// the corresponding entry should be deleted
712+
pgw.MaintainUniqueSources()
713+
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
714+
a.NoError(err)
715+
a.Equal(0, numOfEntries)
707716
})
708717

709718
t.Run("DeleteOldPartitions", func(_ *testing.T) {

0 commit comments

Comments
 (0)