From e80e8dc3ea047caea3e9f15757f204af6ae7fe1f Mon Sep 17 00:00:00 2001 From: Matt Tucker Date: Tue, 19 Aug 2025 17:01:07 +0000 Subject: [PATCH] fix(ingestor): retry with backoff management commands on 429 and >=500 errors --- ingestor/adx/retry.go | 80 ++++++++++++++++++++++++++++++++++++++++ ingestor/adx/syncer.go | 42 +++++++++++++++------ ingestor/adx/uploader.go | 8 +++- 3 files changed, 116 insertions(+), 14 deletions(-) create mode 100644 ingestor/adx/retry.go diff --git a/ingestor/adx/retry.go b/ingestor/adx/retry.go new file mode 100644 index 000000000..26bca211c --- /dev/null +++ b/ingestor/adx/retry.go @@ -0,0 +1,80 @@ +package adx + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/Azure/adx-mon/pkg/logger" +) + +// isThrottled returns true if the error indicates the request was throttled by Kusto. +func isThrottled(err error) bool { + var he *kustoerrors.HttpError + if errors.As(err, &he) { + return he.IsThrottled() + } + return false +} + +// isHTTP5xx returns true if the error is an HttpError with 5xx status code. +func isHTTP5xx(err error) bool { + var he *kustoerrors.HttpError + if errors.As(err, &he) { + return he.StatusCode >= http.StatusInternalServerError && he.StatusCode <= 599 + } + return false +} + +// isTransientKusto returns true if the error is considered retryable by the SDK Retry predicate. +func isTransientKusto(err error) bool { + return kustoerrors.Retry(err) +} + +// retryMgmt wraps a Kusto management command with exponential backoff using the +// Kubernetes wait utilities. Retries are attempted for: +// - Throttling (HTTP 429) +// - HTTP 5xx responses +// - Transient errors recognized by the Kusto SDK's errors.Retry predicate +// +// Backoff parameters: start 500ms, factor 2.0, jitter 0.25, cap 10s, steps 8 (~1m upper bound). +func retryMgmt(ctx context.Context, desc string, fn func() error) error { + backoff := wait.Backoff{ + Duration: 500 * time.Millisecond, + Factor: 2.0, + Jitter: 0.25, + Steps: 8, + Cap: 10 * time.Second, + } + + var lastErr error + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (done bool, err error) { + if ctx.Err() != nil { + return true, ctx.Err() + } + e := fn() + if e == nil { + return true, nil + } + // Decide retry conditions: + shouldRetry := isThrottled(e) || isHTTP5xx(e) || isTransientKusto(e) + if !shouldRetry { + return true, e + } + lastErr = e + logger.Warnf("Retrying %s due to transient error: %v", desc, e) + return false, nil + }) + if err == nil { + return nil + } + if lastErr != nil { + return fmt.Errorf("%s: exhausted retries: %w", desc, lastErr) + } + return fmt.Errorf("%s: %w", desc, err) +} diff --git a/ingestor/adx/syncer.go b/ingestor/adx/syncer.go index 106bc2078..6ad9d32e4 100644 --- a/ingestor/adx/syncer.go +++ b/ingestor/adx/syncer.go @@ -103,8 +103,12 @@ func (s *Syncer) Close() error { func (s *Syncer) loadIngestionMappings(ctx context.Context) error { query := fmt.Sprintf(".show database %s ingestion mappings", s.database) stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(query) - rows, err := s.KustoCli.Mgmt(ctx, s.database, stmt) - if err != nil { + var rows *kusto.RowIterator + if err := retryMgmt(ctx, "loadIngestionMappings", func() error { + var err error + rows, err = s.KustoCli.Mgmt(ctx, s.database, stmt) + return err + }); err != nil { return err } @@ -176,8 +180,12 @@ func (s *Syncer) EnsureTable(table string, mapping schema.SchemaMapping) error { showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String()) - rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt) - if err != nil { + var rows *kusto.RowIterator + if err := retryMgmt(context.Background(), "ensure-table", func() error { + var err error + rows, err = s.KustoCli.Mgmt(context.Background(), s.database, showStmt) + return err + }); err != nil { return err } @@ -249,8 +257,12 @@ func (s *Syncer) EnsureMapping(table string, mapping schema.SchemaMapping) (stri showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String()) - rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt) - if err != nil { + var rows *kusto.RowIterator + if err := retryMgmt(context.Background(), "ensure-mapping", func() error { + var err error + rows, err = s.KustoCli.Mgmt(context.Background(), s.database, showStmt) + return err + }); err != nil { return "", err } @@ -342,16 +354,20 @@ func (s *Syncer) ensurePromMetricsFunctions(ctx context.Context) error { // but we can't create the function unless a table exists. stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd( ".create table AdxmonIngestorTableCardinalityCount (Timestamp: datetime, SeriesId: long, Labels: dynamic, Value: real)") - _, err := s.KustoCli.Mgmt(ctx, s.database, stmt) - if err != nil { + if err := retryMgmt(ctx, "ensure-prom-functions-create-cardinality-table", func() error { + _, err := s.KustoCli.Mgmt(ctx, s.database, stmt) + return err + }); err != nil { return err } for _, fn := range functions { logger.Infof("Creating function %s", fn.name) stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fn.body) - _, err := s.KustoCli.Mgmt(ctx, s.database, stmt) - if err != nil { + if err := retryMgmt(ctx, "create-function-"+fn.name, func() error { + _, err := s.KustoCli.Mgmt(ctx, s.database, stmt) + return err + }); err != nil { return err } } @@ -394,8 +410,10 @@ func (s *Syncer) ensureIngestionPolicy(ctx context.Context) error { stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd( fmt.Sprintf(".alter-merge database %s policy ingestionbatching\n```%s\n```", s.database, string(b))) - _, err = s.KustoCli.Mgmt(ctx, s.database, stmt) - if err != nil { + if err := retryMgmt(ctx, "ensure-ingestion-policy", func() error { + _, err = s.KustoCli.Mgmt(ctx, s.database, stmt) + return err + }); err != nil { return err } return nil diff --git a/ingestor/adx/uploader.go b/ingestor/adx/uploader.go index 4eb654492..0f6a6fe54 100644 --- a/ingestor/adx/uploader.go +++ b/ingestor/adx/uploader.go @@ -355,8 +355,12 @@ func (n *uploader) extractSchema(path string) (string, error) { // https://learn.microsoft.com/en-us/azure/data-explorer/kusto-emulator-overview#limitations func (n *uploader) clusterRequiresDirectIngest(ctx context.Context) (bool, error) { stmt := kql.New(".show cluster details") - rows, err := n.KustoCli.Mgmt(ctx, n.database, stmt) - if err != nil { + var rows *kusto.RowIterator + if err := retryMgmt(ctx, "show-cluster-details", func() error { + var err error + rows, err = n.KustoCli.Mgmt(ctx, n.database, stmt) + return err + }); err != nil { return false, fmt.Errorf("failed to query cluster details: %w", err) } defer rows.Stop()