Skip to content
10 changes: 8 additions & 2 deletions docs/reference/cli_env.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,23 @@ It reads the configuration from the specified sources and metrics, then begins c
ENV: `$PW_BATCHING_DELAY`

- `--partition-interval=`
Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression. (default: 1 week)

Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval. (default: 1 week)
ENV: `$PW_PARTITION_INTERVAL`

Example:
`--partition-inteval="3 weeks 4 days"`,

- `--retention=`

If set, metrics older than that will be deleted (default: "14 days").
Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval. (default: 14 days)
ENV: `$PW_RETENTION`

- `--maintenance-interval=`

Run pgwatch maintenance tasks on sinks with this interval; Set to zero to disable. Must be a valid PostgreSQL interval. (default: 12 hours)
ENV: `$PW_MAINTENANCE_INTERVAL`

- `--real-dbname-field=`

Tag key for real database name (default: real_dbname).
Expand Down
5 changes: 3 additions & 2 deletions internal/sinks/cmdopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import "time"
type CmdOpts struct {
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression." default:"1 week" env:"PW_PARTITION_INTERVAL"`
Retention string `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14 days" env:"PW_RETENTION"`
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink time partitions. Must be a valid PostgreSQL interval." default:"1 week" env:"PW_PARTITION_INTERVAL"`
RetentionInterval string `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Set to zero to disable. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
MaintenanceInterval string `long:"maintenance-interval" mapstructure:"maintenance-interval" description:"Run pgwatch maintenance tasks on sinks with this interval; Set to zero to disable. Must be a valid PostgreSQL interval." default:"12 hours" env:"PW_MAINTENANCE_INTERVAL"`
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}
230 changes: 122 additions & 108 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ var (
targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
)

// PostgresWriter is a sink that writes metric measurements to a Postgres database.
// At the moment, it supports both Postgres and TimescaleDB as a storage backend.
// However, one is able to use any Postgres-compatible database as a storage backend,
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct {
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
opts *CmdOpts
retentionInterval time.Duration
maintenanceInterval time.Duration
input chan metrics.MeasurementEnvelope
lastError chan error
}

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
var conn db.PgxPoolIface
if conn, err = db.New(ctx, connstr); err != nil {
Expand All @@ -43,18 +58,27 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
lastError: make(chan error),
sinkDb: conn,
}
var runDeleteOldPartitions bool
if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
var isValidPartitionInterval bool
if err = conn.QueryRow(ctx,
"SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval,
).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
return err
}
var isValidInterval bool
err = conn.QueryRow(ctx, "SELECT $1::interval >= '1h'::interval", opts.PartitionInterval).Scan(&isValidInterval)
if err != nil {
return err

// epoch returns seconds but time.Duration represents nanoseconds
pgw.retentionInterval *= time.Second
pgw.maintenanceInterval *= time.Second

if !isValidPartitionInterval {
return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval)
}
if pgw.maintenanceInterval < 0 {
return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it")
}
if !isValidInterval {
return fmt.Errorf("partition interval must be at least 1 hour, got: %s", opts.PartitionInterval)
if pgw.retentionInterval < 0 {
return errors.New("--retention must be a positive PostgreSQL interval or 0 to disable it")
}

l.Info("initialising measurements database...")
Expand All @@ -77,10 +101,10 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
return
}
if runDeleteOldPartitions {
go pgw.deleteOldPartitions()
}
go pgw.maintainUniqueSources()

pgw.scheduleJob(pgw.maintenanceInterval, pgw.MaintainUniqueSources)
pgw.scheduleJob(pgw.retentionInterval, pgw.DeleteOldPartitions)

go pgw.poll()
l.Info(`measurements sink is activated`)
return
Expand Down Expand Up @@ -115,19 +139,6 @@ var (
}
)

// PostgresWriter is a sink that writes metric measurements to a Postgres database.
// At the moment, it supports both Postgres and TimescaleDB as a storage backend.
// However, one is able to use any Postgres-compatible database as a storage backend,
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct {
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
opts *CmdOpts
input chan metrics.MeasurementEnvelope
lastError chan error
}

type ExistingPartitionInfo struct {
StartTime time.Time
EndTime time.Time
Expand All @@ -148,6 +159,21 @@ const (
DbStorageSchemaTimescale
)

func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
if interval > 0 {
go func() {
for {
select {
case <-pgw.ctx.Done():
return
case <-time.After(interval):
job()
}
}
}()
}
}

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
var isTs bool
pgw.metricSchema = DbStorageSchemaPostgres
Expand Down Expand Up @@ -464,117 +490,105 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
return nil
}

// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) deleteOldPartitions() {
// DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) DeleteOldPartitions() {
l := log.GetLogger(pgw.ctx)
for {
var partsDropped int
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
pgw.opts.Retention).Scan(&partsDropped)
if err != nil {
l.Error("Could not drop old time partitions:", err)
} else if partsDropped > 0 {
l.Infof("Dropped %d old time partitions", partsDropped)
}
select {
case <-pgw.ctx.Done():
return
case <-time.After(time.Hour * 12):
}
var partsDropped int
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
pgw.opts.RetentionInterval).Scan(&partsDropped)
if err != nil {
l.Error("Could not drop old time partitions:", err)
} else if partsDropped > 0 {
l.Infof("Dropped %d old time partitions", partsDropped)
}
}

// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
// MaintainUniqueSources is a background task that maintains a mapping of unique sources
// in each metric table in admin.all_distinct_dbname_metrics.
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) maintainUniqueSources() {
func (pgw *PostgresWriter) MaintainUniqueSources() {
logger := log.GetLogger(pgw.ctx)
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)

sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
sqlDistinct := `
WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM %s
UNION
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
)
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
sqlAdd := `
INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
INSERT INTO admin.all_distinct_dbname_metrics
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
RETURNING *`

for {
select {
case <-pgw.ctx.Done():
return
case <-time.After(time.Hour * 24):
}
var lock bool
logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
continue
}
if !lock {
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
continue
}
var lock bool
logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
return
}
if !lock {
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
return
}

logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
continue
}
logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
return
}

for _, tableName := range allDistinctMetricTables {
foundDbnamesMap := make(map[string]bool)
foundDbnamesArr := make([]string, 0)
metricName := strings.Replace(tableName, "public.", "", 1)
for _, tableName := range allDistinctMetricTables {
foundDbnamesMap := make(map[string]bool)
foundDbnamesArr := make([]string, 0)
metricName := strings.Replace(tableName, "public.", "", 1)

logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
break
}
for _, drDbname := range ret {
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
}
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
break
}
for _, drDbname := range ret {
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
}

// delete all that are not known and add all that are not there
for k := range foundDbnamesMap {
foundDbnamesArr = append(foundDbnamesArr, k)
}
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
// delete all that are not known and add all that are not there
for k := range foundDbnamesMap {
foundDbnamesArr = append(foundDbnamesArr, k)
}
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)

_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
continue
}
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
time.Sleep(time.Minute)
continue
}

cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
time.Sleep(time.Minute)
}

}

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
Expand Down
Loading
Loading