From 3bcf8be838e4ffea413a6597ec19586e2b1c4203 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 15:50:08 +0000 Subject: [PATCH 1/9] docs: add comprehensive SummaryRule backfill feature implementation plan - Defines backfill spec with start/end datetime and operationId tracking - Uses serial execution to prevent overwhelming Kusto with expensive queries - Implements progress tracking via spec mutation with natural completion - Includes 6-task implementation plan with comprehensive testing - Maintains backwards compatibility with existing SummaryRules - Adds 30-day max lookback validation and proper error handling --- .github/plans/summaryrules/backfill.md | 297 +++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 .github/plans/summaryrules/backfill.md diff --git a/.github/plans/summaryrules/backfill.md b/.github/plans/summaryrules/backfill.md new file mode 100644 index 000000000..f1f010a54 --- /dev/null +++ b/.github/plans/summaryrules/backfill.md @@ -0,0 +1,297 @@ +# SummaryRule Backfill Feature Implementation Plan + +## Planning Progress +- [x] 🔍 Feature requirements gathered +- [x] 📚 Codebase patterns analyzed +- [x] 🌐 Third-party research complete +- [x] 🏗️ Architecture decisions made +- [x] 💬 User validation complete +- [x] 📋 Implementation plan finalized + +## Feature Overview +This plan outlines the implementation of a backfill feature for SummaryRules that allows users to specify a date range for historical data processing. The feature will enable SummaryRules to process historical data by automatically advancing through time windows from a specified start datetime until reaching an end datetime. + +## Context Research Summary + +### Current SummaryRule Architecture +I've analyzed the existing SummaryRule implementation and identified these key patterns: + +**Current Execution Flow:** +1. `SummaryRuleTask.Run()` processes each rule individually +2. `shouldProcessRule()` filters by database and criteria matching +3. `handleRuleExecution()` determines if a rule should submit based on interval timing +4. `NextExecutionWindow()` calculates time ranges based on last execution +5. Async operations are tracked via Kusto operation IDs in CRD status conditions + +**Key Components:** +- `SummaryRuleSpec`: Contains interval, database, table, body (KQL), and criteria +- `AsyncOperation`: Tracks individual query submissions with OperationId, StartTime, EndTime +- Time substitution via `kustoutil.ApplySubstitutions()` using `_startTime` and `_endTime` variables +- Uses RFC3339Nano format for datetime values in Kusto queries + +**Current Timing Logic:** +- Rules execute on fixed intervals from last successful execution +- `BackfillAsyncOperations()` already exists to catch up missed intervals +- Time windows are aligned to interval boundaries with optional ingestion delay + +### Third-Party Research Findings +**Kusto DateTime Handling:** +- Supports ISO 8601 formats (RFC3339 compatible) +- Uses UTC time zone exclusively +- Minimum time unit is 100 nanoseconds ("ticks") +- Prefers `datetime(2024-01-01T00:00:00Z)` format for literals + +## User Requirements Analysis + +Based on user feedback: + +### **Backfill Specification:** +```yaml +spec: + backfill: + start: "2024-01-01T00:00:00Z" # Required start datetime + end: "2024-01-31T23:59:59Z" # Required end datetime +``` + +### **Key Design Decisions:** +1. **Parallel Execution**: Backfill operations run alongside normal interval execution +2. **Auto-cleanup**: System removes `Backfill` field when complete +3. **Reconcile Integration**: Backfill as standard reconcile operation (submit interval, then backfill if needed) +4. **Interval Alignment**: Use same interval boundaries and `IngestionDelay` patterns +5. **Validation**: Start < End, max 30-day lookback period +6. **User Responsibility**: Users handle data overlap concerns + +### **Critical Design Question: Operation Tracking** + +## Architectural Decision: Single Backfill Operation Tracking + +**Implementation Strategy:** +We will track backfill progress by storing the current operation directly in the spec: + +```yaml +spec: + backfill: + start: "2024-01-01T00:00:00Z" + end: "2024-01-31T23:59:59Z" + operationId: "some-async-operation-id" # Current operation +``` + +**Rationale:** +- **Prevents Kusto Overload**: Serial execution avoids overwhelming Kusto with expensive backfill queries +- **Steady Progress**: Reconcile loop (1-minute intervals) ensures consistent advancement +- **Simple State Management**: Single operation tracking is straightforward to implement and debug +- **Natural Completion**: When `start >= end`, backfill is complete and field can be removed +- **Clear Progress Visibility**: Users can easily see current backfill progress in the spec + +**Implementation Details:** +```go +type BackfillSpec struct { + Start string `json:"start"` // Current backfill position + End string `json:"end"` // Target end time + OperationId string `json:"operationId,omitempty"` // Active operation ID +} +``` + +**Execution Flow:** +1. If `operationId` is empty and `start < end`: Submit next backfill window, store operation ID +2. If `operationId` exists: Check operation status in Kusto +3. If operation completes successfully: Advance `start` to window end time, clear `operationId` +4. If operation fails: Handle error, clear `operationId` for retry +5. If `start >= end`: Remove entire `backfill` field from spec + +## Risk Assessment Matrix + +### **Overall Feature Risks:** +- **CRD Update Risk**: Breaking changes to SummaryRule API + - *Mitigation*: Use optional fields with proper validation +- **Spec Mutation Conflicts**: User editing spec during backfill + - *Mitigation*: Document behavior, use generation tracking +- **Performance Impact**: Backfill operations affecting normal processing + - *Mitigation*: Serial execution prevents overload + +### **Integration Risks:** +- **Kusto Operation Tracking**: Backfill operations not tracked in AsyncOperations + - *Mitigation*: Query Kusto directly using operation ID +- **Time Window Calculation**: Incorrect interval alignment + - *Mitigation*: Reuse existing `NextExecutionWindow` patterns +- **Validation Edge Cases**: Invalid date ranges, timezone issues + - *Mitigation*: Comprehensive validation with proper error messages + +## Implementation Plan + +### **Task 1: Add Backfill Spec to SummaryRule CRD** +**Commit Message**: `feat: add backfill specification to SummaryRule CRD` + +**Objective**: Extend SummaryRule API to support backfill operations + +**Implementation Details:** +- Add `BackfillSpec` struct to `api/v1/summaryrule_types.go` +- Add optional `Backfill *BackfillSpec` field to `SummaryRuleSpec` +- Add kubebuilder validation tags for date format and max lookback period +- Add const for max backfill lookback period (30 days) +- Run `make generate-crd CMD=update` to generate kubebuilder manifests after modifying the spec. + +**Files to modify:** +- `api/v1/summaryrule_types.go` + +**Validation Criteria:** +- CRD accepts valid backfill specifications +- Validation rejects invalid date ranges and excessive lookback periods +- Existing SummaryRules continue to work unchanged + +**Review Guidance:** +- Focus on validation logic correctness +- Ensure backwards compatibility + +--- + +### **Task 2: Add Backfill Helper Methods** +**Commit Message**: `feat: add backfill helper methods to SummaryRule` + +**Objective**: Implement core backfill logic methods on SummaryRule type + +**Implementation Details:** +- Add `HasActiveBackfill() bool` method +- Add `GetNextBackfillWindow(clock.Clock) (time.Time, time.Time, bool)` method +- Add `AdvanceBackfillProgress(endTime time.Time)` method +- Add `ClearBackfillOperation()` method +- Add `IsBackfillComplete() bool` method + +**Files to modify:** +- `api/v1/summaryrule_types.go` + +**Testing:** +- Unit tests for all helper methods with various scenarios +- Test boundary conditions (completion, invalid states) + +**Validation Criteria:** +- Methods correctly calculate next backfill windows +- Progress advancement works with interval alignment +- Completion detection is accurate + +--- + +### **Task 3: Integrate Backfill into SummaryRuleTask** +**Commit Message**: `feat: integrate backfill execution into SummaryRuleTask` + +**Objective**: Modify task execution to handle backfill operations + +**Implementation Details:** +- Add `handleBackfillExecution(ctx, rule)` method to `SummaryRuleTask` +- Modify `Run()` method to call backfill handler after normal execution +- Add backfill operation status checking in `trackAsyncOperations()` +- Implement backfill completion cleanup + +**Files to modify:** +- `ingestor/adx/tasks.go` + +**Integration Notes:** +- Reuse existing `submitRule()` method for backfill operations +- Use same Kusto operation polling infrastructure +- Ensure backfill doesn't interfere with normal operation tracking + +**Validation Criteria:** +- Backfill operations submit correctly +- Progress advances after successful operations +- Failed operations retry appropriately +- Backfill completes and cleans up properly + +--- + +### **Task 4: Add Comprehensive Backfill Unit Tests** +**Commit Message**: `test: add comprehensive unit tests for backfill functionality` + +**Objective**: Ensure backfill feature works correctly across all scenarios + +**Implementation Details:** +- Test backfill window calculation with various intervals +- Test progress advancement and completion detection +- Test error handling and retry scenarios +- Test integration with normal rule execution +- Test edge cases (invalid dates, completed backfills) + +**Files to modify:** +- `api/v1/summaryrule_types_test.go` +- `ingestor/adx/tasks_test.go` + +**Testing Coverage:** +- All backfill helper methods +- Integration with SummaryRuleTask +- Error conditions and edge cases +- Clock-dependent behavior with fake clocks + +--- + +### **Task 5: Update CRD Manifests** +**Commit Message**: `feat: regenerate CRD manifests for backfill support` + +**Objective**: Update generated CRD YAML files with backfill fields + +**Implementation Details:** +- Run `make generate-crd CMD=update` to regenerate manifests +- Verify generated YAML includes proper validation rules +- Update operator manifests + +**Files to modify:** +- `kustomize/bases/adx-mon.azure.com_summaryrules.yaml` +- `operator/manifests/crds/adx-mon.azure.com_summaryrules.yaml` + +**Validation Criteria:** +- Generated CRDs include backfill fields +- Validation rules are properly applied +- No breaking changes to existing fields + +--- + +### **Task 6: Add Documentation and Examples** +**Commit Message**: `docs: add backfill feature documentation and examples` + +**Objective**: Document the backfill feature for users + +**Implementation Details:** +- Add backfill section to SummaryRule documentation +- Create example YAML manifests showing backfill usage +- Document limitations and best practices + +**Files to modify:** +- `docs/crds.md` (or relevant SummaryRule documentation) +- Create example files in appropriate documentation location + +**Documentation Coverage:** +- Backfill field specification +- Example use cases +- Limitations and performance considerations +- Troubleshooting common issues + +## Implementation Notes + +### **Error Handling Patterns:** +- Use existing Kusto error parsing via `kustoutil.ParseError()` +- Log backfill progress at Info level for visibility +- Handle operation failures with same retry logic as normal operations + +### **Configuration:** +- Max backfill lookback period: 30 days (const in CRD types file) +- Backfill operations use same `IngestionDelay` as normal operations +- Interval alignment follows existing `NextExecutionWindow` patterns + +### **Backward Compatibility:** +- All backfill fields are optional +- Existing SummaryRules work unchanged +- No migration required for existing deployments + +### **Performance Considerations:** +- Serial backfill execution prevents Kusto overload +- Backfill operations use same async operation infrastructure +- Progress visibility through spec inspection + +## Documentation Plan + +### **Existing Documentation Updates:** +- Update `docs/crds.md` to document backfill field +- Add backfill examples to existing SummaryRule documentation + +### **New Documentation:** +- Backfill troubleshooting guide +- Performance impact documentation +- Best practices for large date ranges From 945937e4de2ef152b98d128ab0202a4519571415 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 16:05:38 +0000 Subject: [PATCH 2/9] feat: add backfill specification to SummaryRule CRD - Add BackfillSpec struct with start, end, and operationId fields - Add optional Backfill field to SummaryRuleSpec with validation - Add 30-day max lookback period constant - Generate updated CRD manifests with kubebuilder validation - Maintain backwards compatibility with existing SummaryRules --- api/v1/summaryrule_types.go | 28 +++++++++++++++++++ api/v1/zz_generated.deepcopy.go | 20 +++++++++++++ kustomize/bases/summaryrules_crd.yaml | 26 +++++++++++++++++ operator/manifests/crds/summaryrules_crd.yaml | 26 +++++++++++++++++ 4 files changed, 100 insertions(+) diff --git a/api/v1/summaryrule_types.go b/api/v1/summaryrule_types.go index e79313e64..10c715361 100644 --- a/api/v1/summaryrule_types.go +++ b/api/v1/summaryrule_types.go @@ -39,8 +39,29 @@ const ( // the status of an async operation. This value is somewhat arbitrary, but the intent // is to not overwhelm the service with requests. SummaryRuleAsyncOperationPollInterval = 10 * time.Minute + // SummaryRuleMaxBackfillLookback is the maximum allowed lookback period for backfill operations + SummaryRuleMaxBackfillLookback = 30 * 24 * time.Hour // 30 days ) +// BackfillSpec defines the configuration for backfilling historical data +type BackfillSpec struct { + // Start is the current backfill position (RFC3339 format) + // +kubebuilder:validation:Required + // +kubebuilder:validation:Type=string + // +kubebuilder:validation:Format=date-time + Start string `json:"start"` + + // End is the target end time for backfill (RFC3339 format) + // +kubebuilder:validation:Required + // +kubebuilder:validation:Type=string + // +kubebuilder:validation:Format=date-time + End string `json:"end"` + + // OperationId is the current active Kusto operation ID for this backfill window + // +optional + OperationId string `json:"operationId,omitempty"` +} + // SummaryRuleSpec defines the desired state of SummaryRule type SummaryRuleSpec struct { // Database is the name of the database in which the function will be created @@ -66,6 +87,13 @@ type SummaryRuleSpec struct { // started with `--cluster-labels=region=eastus`. If a SummaryRule has `criteria: {region: [eastus]}`, then the rule will only // execute on that ingestor. Any key/values pairs must match (case-insensitive) for the rule to execute. Criteria map[string][]string `json:"criteria,omitempty"` + + // Backfill specifies historical data processing configuration. When specified, the rule will process + // historical data from the start datetime to the end datetime in addition to normal interval execution. + // The system will automatically advance through time windows and remove this field when complete. + // +optional + // +kubebuilder:validation:XValidation:rule="!has(self) || (has(self.start) && has(self.end))",message="backfill start and end are required when backfill is specified" + Backfill *BackfillSpec `json:"backfill,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b12b62b84..585e1f06e 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -466,6 +466,21 @@ func (in *AsyncOperation) DeepCopy() *AsyncOperation { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BackfillSpec) DeepCopyInto(out *BackfillSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackfillSpec. +func (in *BackfillSpec) DeepCopy() *BackfillSpec { + if in == nil { + return nil + } + out := new(BackfillSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Collector) DeepCopyInto(out *Collector) { *out = *in @@ -1059,6 +1074,11 @@ func (in *SummaryRuleSpec) DeepCopyInto(out *SummaryRuleSpec) { (*out)[key] = outVal } } + if in.Backfill != nil { + in, out := &in.Backfill, &out.Backfill + *out = new(BackfillSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SummaryRuleSpec. diff --git a/kustomize/bases/summaryrules_crd.yaml b/kustomize/bases/summaryrules_crd.yaml index 1e623e5ef..b2210e4fa 100644 --- a/kustomize/bases/summaryrules_crd.yaml +++ b/kustomize/bases/summaryrules_crd.yaml @@ -39,6 +39,32 @@ spec: spec: description: SummaryRuleSpec defines the desired state of SummaryRule properties: + backfill: + description: |- + Backfill specifies historical data processing configuration. When specified, the rule will process + historical data from the start datetime to the end datetime in addition to normal interval execution. + The system will automatically advance through time windows and remove this field when complete. + properties: + end: + description: End is the target end time for backfill (RFC3339 + format) + format: date-time + type: string + operationId: + description: OperationId is the current active Kusto operation + ID for this backfill window + type: string + start: + description: Start is the current backfill position (RFC3339 format) + format: date-time + type: string + required: + - end + - start + type: object + x-kubernetes-validations: + - message: backfill start and end are required when backfill is specified + rule: '!has(self) || (has(self.start) && has(self.end))' body: description: Body is the KQL body of the function type: string diff --git a/operator/manifests/crds/summaryrules_crd.yaml b/operator/manifests/crds/summaryrules_crd.yaml index 1e623e5ef..b2210e4fa 100644 --- a/operator/manifests/crds/summaryrules_crd.yaml +++ b/operator/manifests/crds/summaryrules_crd.yaml @@ -39,6 +39,32 @@ spec: spec: description: SummaryRuleSpec defines the desired state of SummaryRule properties: + backfill: + description: |- + Backfill specifies historical data processing configuration. When specified, the rule will process + historical data from the start datetime to the end datetime in addition to normal interval execution. + The system will automatically advance through time windows and remove this field when complete. + properties: + end: + description: End is the target end time for backfill (RFC3339 + format) + format: date-time + type: string + operationId: + description: OperationId is the current active Kusto operation + ID for this backfill window + type: string + start: + description: Start is the current backfill position (RFC3339 format) + format: date-time + type: string + required: + - end + - start + type: object + x-kubernetes-validations: + - message: backfill start and end are required when backfill is specified + rule: '!has(self) || (has(self.start) && has(self.end))' body: description: Body is the KQL body of the function type: string From 82d30017caff681aae225d80210f1600bb6a9e57 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 16:12:24 +0000 Subject: [PATCH 3/9] feat: add backfill helper methods to SummaryRule - Add HasActiveBackfill() to check for active backfill configuration - Add IsBackfillComplete() to detect completion - Add GetNextBackfillWindow() for time window calculation with interval alignment - Add AdvanceBackfillProgress() to move backfill position forward - Add operation management methods (Set/Get/Clear BackfillOperation) - Add RemoveBackfill() for cleanup when complete - Include comprehensive unit tests covering all edge cases - Handle ingestion delay correctly in window calculations --- api/v1/summaryrule_types.go | 120 ++++++++++++ api/v1/summaryrule_types_test.go | 308 +++++++++++++++++++++++++++++++ 2 files changed, 428 insertions(+) diff --git a/api/v1/summaryrule_types.go b/api/v1/summaryrule_types.go index 10c715361..2656d415b 100644 --- a/api/v1/summaryrule_types.go +++ b/api/v1/summaryrule_types.go @@ -468,6 +468,126 @@ func (s *SummaryRule) BackfillAsyncOperations(clk clock.Clock) { meta.SetStatusCondition(&s.Status.Conditions, *condition) } +// HasActiveBackfill returns true if the rule has an active backfill configuration +func (s *SummaryRule) HasActiveBackfill() bool { + return s.Spec.Backfill != nil && s.Spec.Backfill.Start != "" && s.Spec.Backfill.End != "" +} + +// IsBackfillComplete returns true if the backfill has reached or passed the end time +func (s *SummaryRule) IsBackfillComplete() bool { + if !s.HasActiveBackfill() { + return true // No backfill means it's "complete" + } + + startTime, err := time.Parse(time.RFC3339, s.Spec.Backfill.Start) + if err != nil { + return false // Invalid start time means not complete + } + + endTime, err := time.Parse(time.RFC3339, s.Spec.Backfill.End) + if err != nil { + return false // Invalid end time means not complete + } + + return startTime.After(endTime) || startTime.Equal(endTime) +} + +// GetNextBackfillWindow calculates the next time window for backfill execution +// Returns start time, end time, and whether a valid window exists +// Uses the same interval alignment as normal execution +func (s *SummaryRule) GetNextBackfillWindow(clk clock.Clock) (time.Time, time.Time, bool) { + if clk == nil { + clk = clock.RealClock{} + } + + if !s.HasActiveBackfill() || s.IsBackfillComplete() { + return time.Time{}, time.Time{}, false + } + + // Parse the current backfill start position + currentStart, err := time.Parse(time.RFC3339, s.Spec.Backfill.Start) + if err != nil { + return time.Time{}, time.Time{}, false + } + + // Parse the backfill end time + backfillEnd, err := time.Parse(time.RFC3339, s.Spec.Backfill.End) + if err != nil { + return time.Time{}, time.Time{}, false + } + + // Apply ingestion delay + var delay time.Duration + if s.Spec.IngestionDelay != nil { + delay = s.Spec.IngestionDelay.Duration + } + + // Align to interval boundaries like normal execution + intervalDuration := s.Spec.Interval.Duration + windowStartTime := currentStart.UTC().Truncate(intervalDuration) + windowEndTime := windowStartTime.Add(intervalDuration) + + // Don't exceed the backfill end time + if windowEndTime.After(backfillEnd) { + windowEndTime = backfillEnd + } + + // Apply ingestion delay to window + windowStartTime = windowStartTime.Add(-delay) + windowEndTime = windowEndTime.Add(-delay) + + // Validate that we have a meaningful window + if windowStartTime.After(windowEndTime) || windowStartTime.Equal(windowEndTime) { + return time.Time{}, time.Time{}, false + } + + return windowStartTime, windowEndTime, true +} + +// AdvanceBackfillProgress moves the backfill start position forward after successful execution +func (s *SummaryRule) AdvanceBackfillProgress(endTime time.Time) { + if !s.HasActiveBackfill() { + return + } + + // Apply ingestion delay in reverse to get the actual progress position + var delay time.Duration + if s.Spec.IngestionDelay != nil { + delay = s.Spec.IngestionDelay.Duration + } + + // Move start position to the end time (plus delay to account for delay applied in window calculation) + progressPosition := endTime.Add(delay) + s.Spec.Backfill.Start = progressPosition.UTC().Format(time.RFC3339) +} + +// ClearBackfillOperation clears the current operation ID from the backfill spec +func (s *SummaryRule) ClearBackfillOperation() { + if s.HasActiveBackfill() { + s.Spec.Backfill.OperationId = "" + } +} + +// SetBackfillOperation sets the operation ID for the current backfill window +func (s *SummaryRule) SetBackfillOperation(operationId string) { + if s.HasActiveBackfill() { + s.Spec.Backfill.OperationId = operationId + } +} + +// GetBackfillOperation returns the current backfill operation ID +func (s *SummaryRule) GetBackfillOperation() string { + if s.HasActiveBackfill() { + return s.Spec.Backfill.OperationId + } + return "" +} + +// RemoveBackfill removes the backfill configuration when complete +func (s *SummaryRule) RemoveBackfill() { + s.Spec.Backfill = nil +} + // +kubebuilder:object:root=true // SummaryRuleList contains a list of SummaryRule diff --git a/api/v1/summaryrule_types_test.go b/api/v1/summaryrule_types_test.go index 0487995d9..91f24f3d6 100644 --- a/api/v1/summaryrule_types_test.go +++ b/api/v1/summaryrule_types_test.go @@ -1436,3 +1436,311 @@ func TestBackfillAsyncOperations(t *testing.T) { require.NotZero(t, condition.LastTransitionTime) }) } + +func TestBackfillHelperMethods(t *testing.T) { + baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC) + + t.Run("HasActiveBackfill", func(t *testing.T) { + tests := []struct { + name string + backfill *BackfillSpec + expected bool + }{ + { + name: "no backfill", + backfill: nil, + expected: false, + }, + { + name: "empty backfill", + backfill: &BackfillSpec{}, + expected: false, + }, + { + name: "only start set", + backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + }, + expected: false, + }, + { + name: "only end set", + backfill: &BackfillSpec{ + End: endTime.Format(time.RFC3339), + }, + expected: false, + }, + { + name: "both start and end set", + backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Backfill: tc.backfill, + }, + } + require.Equal(t, tc.expected, rule.HasActiveBackfill()) + }) + } + }) + + t.Run("IsBackfillComplete", func(t *testing.T) { + tests := []struct { + name string + backfill *BackfillSpec + expected bool + }{ + { + name: "no backfill is complete", + backfill: nil, + expected: true, + }, + { + name: "start before end", + backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + expected: false, + }, + { + name: "start equals end", + backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Format(time.RFC3339), + }, + expected: true, + }, + { + name: "start after end", + backfill: &BackfillSpec{ + Start: endTime.Format(time.RFC3339), + End: baseTime.Format(time.RFC3339), + }, + expected: true, + }, + { + name: "invalid start time", + backfill: &BackfillSpec{ + Start: "invalid", + End: endTime.Format(time.RFC3339), + }, + expected: false, + }, + { + name: "invalid end time", + backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: "invalid", + }, + expected: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Backfill: tc.backfill, + }, + } + require.Equal(t, tc.expected, rule.IsBackfillComplete()) + }) + } + }) + + t.Run("GetNextBackfillWindow", func(t *testing.T) { + fakeClock := NewFakeClock(baseTime.Add(12 * time.Hour)) + + t.Run("no backfill returns false", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + }, + } + start, end, ok := rule.GetNextBackfillWindow(fakeClock) + require.False(t, ok) + require.True(t, start.IsZero()) + require.True(t, end.IsZero()) + }) + + t.Run("completed backfill returns false", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &BackfillSpec{ + Start: endTime.Format(time.RFC3339), + End: baseTime.Format(time.RFC3339), // Start after end + }, + }, + } + start, end, ok := rule.GetNextBackfillWindow(fakeClock) + require.False(t, ok) + require.True(t, start.IsZero()) + require.True(t, end.IsZero()) + }) + + t.Run("valid backfill window", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + start, end, ok := rule.GetNextBackfillWindow(fakeClock) + require.True(t, ok) + require.Equal(t, baseTime, start) + require.Equal(t, baseTime.Add(time.Hour), end) + }) + + t.Run("with ingestion delay", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + IngestionDelay: &metav1.Duration{Duration: 5 * time.Minute}, + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + start, end, ok := rule.GetNextBackfillWindow(fakeClock) + require.True(t, ok) + require.Equal(t, baseTime.Add(-5*time.Minute), start) + require.Equal(t, baseTime.Add(time.Hour-5*time.Minute), end) + }) + + t.Run("window end limited by backfill end", func(t *testing.T) { + // Backfill end is 30 minutes into the interval + shortEnd := baseTime.Add(30 * time.Minute) + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: shortEnd.Format(time.RFC3339), + }, + }, + } + start, end, ok := rule.GetNextBackfillWindow(fakeClock) + require.True(t, ok) + require.Equal(t, baseTime, start) + require.Equal(t, shortEnd, end) + }) + + t.Run("uses real clock when nil", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + start, end, ok := rule.GetNextBackfillWindow(nil) + require.True(t, ok) + require.False(t, start.IsZero()) + require.False(t, end.IsZero()) + }) + }) + + t.Run("AdvanceBackfillProgress", func(t *testing.T) { + t.Run("advances progress correctly", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + + newEndTime := baseTime.Add(time.Hour) + rule.AdvanceBackfillProgress(newEndTime) + require.Equal(t, newEndTime.UTC().Format(time.RFC3339), rule.Spec.Backfill.Start) + }) + + t.Run("handles ingestion delay", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + IngestionDelay: &metav1.Duration{Duration: 5 * time.Minute}, + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + + newEndTime := baseTime.Add(time.Hour) + rule.AdvanceBackfillProgress(newEndTime) + // Should add back the ingestion delay + expected := newEndTime.Add(5 * time.Minute) + require.Equal(t, expected.UTC().Format(time.RFC3339), rule.Spec.Backfill.Start) + }) + + t.Run("no-op when no backfill", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{}, + } + rule.AdvanceBackfillProgress(baseTime) + // Should not panic or error + }) + }) + + t.Run("BackfillOperationMethods", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + + // Test SetBackfillOperation + rule.SetBackfillOperation("test-operation-id") + require.Equal(t, "test-operation-id", rule.Spec.Backfill.OperationId) + + // Test GetBackfillOperation + require.Equal(t, "test-operation-id", rule.GetBackfillOperation()) + + // Test ClearBackfillOperation + rule.ClearBackfillOperation() + require.Empty(t, rule.Spec.Backfill.OperationId) + + // Test operations on rule without backfill + ruleNoBackfill := &SummaryRule{Spec: SummaryRuleSpec{}} + ruleNoBackfill.SetBackfillOperation("test") + require.Empty(t, ruleNoBackfill.GetBackfillOperation()) + ruleNoBackfill.ClearBackfillOperation() // Should not panic + }) + + t.Run("RemoveBackfill", func(t *testing.T) { + rule := &SummaryRule{ + Spec: SummaryRuleSpec{ + Backfill: &BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + + require.True(t, rule.HasActiveBackfill()) + rule.RemoveBackfill() + require.False(t, rule.HasActiveBackfill()) + require.Nil(t, rule.Spec.Backfill) + }) +} From 865884f89d8de0b780e6b73e54ee09112b33f22f Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 16:14:22 +0000 Subject: [PATCH 4/9] feat: integrate backfill execution into SummaryRuleTask - Add handleBackfillExecution() method for backfill operation management - Add handleBackfillOperationStatus() to track running backfill operations - Add submitNextBackfillWindow() to submit next backfill window - Integrate backfill execution into main Run() loop alongside normal execution - Handle backfill completion and automatic cleanup - Support operation retry and error handling for backfill operations - Log backfill progress for visibility and debugging --- ingestor/adx/tasks.go | 118 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/ingestor/adx/tasks.go b/ingestor/adx/tasks.go index 659d54793..c77f3c73d 100644 --- a/ingestor/adx/tasks.go +++ b/ingestor/adx/tasks.go @@ -335,6 +335,12 @@ func (t *SummaryRuleTask) Run(ctx context.Context) error { // Handle rule execution logic (timing evaluation and submission) err := t.handleRuleExecution(timeoutCtx, &rule) + // Handle backfill execution logic (check progress and submit next window if needed) + backfillErr := t.handleBackfillExecution(timeoutCtx, &rule) + if err == nil { + err = backfillErr // Use backfill error if normal execution succeeded + } + // Process any backlog operations for this rule rule.BackfillAsyncOperations(t.Clock) @@ -432,6 +438,118 @@ func (t *SummaryRuleTask) handleRuleExecution(ctx context.Context, rule *v1.Summ return nil } +// handleBackfillExecution handles backfill operation logic for a single summary rule. +// It checks if there's an active backfill, manages operation submission and progress tracking. +// Returns any submission or operation error that occurred. +func (t *SummaryRuleTask) handleBackfillExecution(ctx context.Context, rule *v1.SummaryRule) error { + if !rule.HasActiveBackfill() { + return nil + } + + // Check if backfill is complete + if rule.IsBackfillComplete() { + logger.Infof("Backfill complete for rule %s.%s, removing backfill configuration", rule.Spec.Database, rule.Name) + rule.RemoveBackfill() + return nil + } + + // Check if there's a current operation in progress + currentOperationId := rule.GetBackfillOperation() + if currentOperationId != "" { + // There's an operation in progress, check its status + return t.handleBackfillOperationStatus(ctx, rule, currentOperationId) + } + + // No operation in progress, submit the next backfill window + return t.submitNextBackfillWindow(ctx, rule) +} + +// handleBackfillOperationStatus checks the status of a running backfill operation +func (t *SummaryRuleTask) handleBackfillOperationStatus(ctx context.Context, rule *v1.SummaryRule, operationId string) error { + // Get current Kusto operations to check status + kustoAsyncOperations, err := t.GetOperations(ctx) + if err != nil { + // If we can't get operations status, we'll retry next time + logger.Warnf("Failed to get async operations for backfill operation %s: %v", operationId, err) + return nil + } + + // Find our operation + index := slices.IndexFunc(kustoAsyncOperations, func(item AsyncOperationStatus) bool { + return item.OperationId == operationId + }) + + if index == -1 { + // Operation not found, possibly too old or completed outside our window + logger.Warnf("Backfill operation %s not found in Kusto operations, clearing operation ID", operationId) + rule.ClearBackfillOperation() + return nil + } + + kustoOp := kustoAsyncOperations[index] + + // Check if operation is complete + if IsKustoAsyncOperationStateCompleted(kustoOp.State) { + if kustoOp.State == string(KustoAsyncOperationStateFailed) { + logger.Errorf("Backfill operation %s for rule %s.%s failed: %s", operationId, rule.Spec.Database, rule.Name, kustoOp.Status) + rule.ClearBackfillOperation() + // Return error to be tracked in rule status + if kustoOp.Status != "" { + return fmt.Errorf("backfill operation %s failed: %s", operationId, kustoOp.Status) + } + return fmt.Errorf("backfill operation %s failed", operationId) + } else { + // Operation succeeded, advance backfill progress + logger.Infof("Backfill operation %s for rule %s.%s completed successfully", operationId, rule.Spec.Database, rule.Name) + + // Get the window that was just processed to advance progress + _, windowEnd, ok := rule.GetNextBackfillWindow(t.Clock) + if ok { + rule.AdvanceBackfillProgress(windowEnd) + logger.Infof("Advanced backfill progress for rule %s.%s to %v", rule.Spec.Database, rule.Name, windowEnd) + } + rule.ClearBackfillOperation() + } + } + + // Check if operation should be retried + if kustoOp.ShouldRetry != 0 { + logger.Infof("Backfill operation %s for rule %s.%s is marked for retry, resubmitting", operationId, rule.Spec.Database, rule.Name) + rule.ClearBackfillOperation() + return t.submitNextBackfillWindow(ctx, rule) + } + + return nil +} + +// submitNextBackfillWindow submits the next backfill window for execution +func (t *SummaryRuleTask) submitNextBackfillWindow(ctx context.Context, rule *v1.SummaryRule) error { + windowStartTime, windowEndTime, ok := rule.GetNextBackfillWindow(t.Clock) + if !ok { + // No valid window, backfill might be complete + if rule.IsBackfillComplete() { + logger.Infof("Backfill complete for rule %s.%s, removing backfill configuration", rule.Spec.Database, rule.Name) + rule.RemoveBackfill() + } + return nil + } + + // Subtract 1 tick from endTime for the query to avoid boundary issues + queryEndTime := windowEndTime.Add(-kustoutil.OneTick) + + operationId, err := t.SubmitRule(ctx, *rule, windowStartTime.Format(time.RFC3339Nano), queryEndTime.Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("failed to submit backfill operation for rule %s.%s (window %v to %v): %w", + rule.Spec.Database, rule.Name, windowStartTime, windowEndTime, err) + } + + rule.SetBackfillOperation(operationId) + logger.Infof("Submitted backfill operation %s for rule %s.%s (window %v to %v)", + operationId, rule.Spec.Database, rule.Name, windowStartTime, windowEndTime) + + return nil +} + func (t *SummaryRuleTask) trackAsyncOperations(ctx context.Context, rule *v1.SummaryRule, kustoAsyncOperations []AsyncOperationStatus) { operations := rule.GetAsyncOperations() for _, op := range operations { From 8c41ac5c8c26db30381ab45dc3335d450f55e45d Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 16:36:01 +0000 Subject: [PATCH 5/9] feat: add comprehensive unit tests for backfill task integration - Add TestSummaryRuleTask_BackfillIntegration with 9 test scenarios - Cover operation submission, progress advancement, error handling - Test retry logic, completion detection, and edge cases - Include ingestion delay support and missing operation handling - Follow existing test patterns with proper mocking and assertions --- api/v1/summaryrule_types.go | 4 +- ingestor/adx/tasks.go | 6 +- ingestor/adx/tasks_test.go | 332 ++++++++++++++++++++++++++++++++++++ 3 files changed, 337 insertions(+), 5 deletions(-) diff --git a/api/v1/summaryrule_types.go b/api/v1/summaryrule_types.go index 2656d415b..232cae70d 100644 --- a/api/v1/summaryrule_types.go +++ b/api/v1/summaryrule_types.go @@ -50,13 +50,13 @@ type BackfillSpec struct { // +kubebuilder:validation:Type=string // +kubebuilder:validation:Format=date-time Start string `json:"start"` - + // End is the target end time for backfill (RFC3339 format) // +kubebuilder:validation:Required // +kubebuilder:validation:Type=string // +kubebuilder:validation:Format=date-time End string `json:"end"` - + // OperationId is the current active Kusto operation ID for this backfill window // +optional OperationId string `json:"operationId,omitempty"` diff --git a/ingestor/adx/tasks.go b/ingestor/adx/tasks.go index c77f3c73d..fa4b60226 100644 --- a/ingestor/adx/tasks.go +++ b/ingestor/adx/tasks.go @@ -501,7 +501,7 @@ func (t *SummaryRuleTask) handleBackfillOperationStatus(ctx context.Context, rul } else { // Operation succeeded, advance backfill progress logger.Infof("Backfill operation %s for rule %s.%s completed successfully", operationId, rule.Spec.Database, rule.Name) - + // Get the window that was just processed to advance progress _, windowEnd, ok := rule.GetNextBackfillWindow(t.Clock) if ok { @@ -539,12 +539,12 @@ func (t *SummaryRuleTask) submitNextBackfillWindow(ctx context.Context, rule *v1 operationId, err := t.SubmitRule(ctx, *rule, windowStartTime.Format(time.RFC3339Nano), queryEndTime.Format(time.RFC3339Nano)) if err != nil { - return fmt.Errorf("failed to submit backfill operation for rule %s.%s (window %v to %v): %w", + return fmt.Errorf("failed to submit backfill operation for rule %s.%s (window %v to %v): %w", rule.Spec.Database, rule.Name, windowStartTime, windowEndTime, err) } rule.SetBackfillOperation(operationId) - logger.Infof("Submitted backfill operation %s for rule %s.%s (window %v to %v)", + logger.Infof("Submitted backfill operation %s for rule %s.%s (window %v to %v)", operationId, rule.Spec.Database, rule.Name, windowStartTime, windowEndTime) return nil diff --git a/ingestor/adx/tasks_test.go b/ingestor/adx/tasks_test.go index 4eb5fd61b..1100146b9 100644 --- a/ingestor/adx/tasks_test.go +++ b/ingestor/adx/tasks_test.go @@ -1879,3 +1879,335 @@ func TestSummaryRuleHandlesMixedAsyncOperationStatesCorrectly(t *testing.T) { } require.True(t, retryOpFound, "Retry operation should be present in final operations") } + +func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { + baseTime := time.Date(2024, 6, 23, 10, 0, 0, 0, time.UTC) + fakeClock := klock.NewFakeClock(baseTime) + + type submitCall struct { + rule v1.SummaryRule + startTime string + endTime string + opId string + } + + t.Run("submits backfill operation when no operation in progress", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + }, + }, + } + + // Mock successful submission + var submitCalls []submitCall + task := &SummaryRuleTask{ + Clock: fakeClock, + SubmitRule: func(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) { + opId := fmt.Sprintf("backfill-op-%d", len(submitCalls)+1) + submitCalls = append(submitCalls, submitCall{ + rule: rule, + startTime: startTime, + endTime: endTime, + opId: opId, + }) + return opId, nil + }, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{}, nil + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify operation was submitted + require.Len(t, submitCalls, 1) + require.Equal(t, "backfill-op-1", rule.GetBackfillOperation()) + + // Verify time window + require.Equal(t, baseTime.Format(time.RFC3339Nano), submitCalls[0].startTime) + expectedEnd := baseTime.Add(time.Hour).Add(-kustoutil.OneTick) + require.Equal(t, expectedEnd.Format(time.RFC3339Nano), submitCalls[0].endTime) + }) + + t.Run("advances progress after successful operation completion", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + OperationId: "test-operation-123", + }, + }, + } + + task := &SummaryRuleTask{ + Clock: fakeClock, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{ + { + OperationId: "test-operation-123", + State: string(KustoAsyncOperationStateCompleted), + }, + }, nil + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify progress was advanced + expectedNewStart := baseTime.Add(time.Hour) + require.Equal(t, expectedNewStart.Format(time.RFC3339), rule.Spec.Backfill.Start) + + // Verify operation ID was cleared + require.Empty(t, rule.GetBackfillOperation()) + }) + + t.Run("handles failed operation by clearing operation ID", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + OperationId: "failed-operation-456", + }, + }, + } + + task := &SummaryRuleTask{ + Clock: fakeClock, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{ + { + OperationId: "failed-operation-456", + State: string(KustoAsyncOperationStateFailed), + Status: "Query execution failed", + }, + }, nil + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.Error(t, err) + require.Contains(t, err.Error(), "backfill operation failed-operation-456 failed") + + // Verify operation ID was cleared but progress not advanced + require.Empty(t, rule.GetBackfillOperation()) + require.Equal(t, baseTime.Format(time.RFC3339), rule.Spec.Backfill.Start) + }) + + t.Run("handles retry operation by resubmitting", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + OperationId: "retry-operation-789", + }, + }, + } + + var submitCalls []submitCall + task := &SummaryRuleTask{ + Clock: fakeClock, + SubmitRule: func(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) { + opId := "retry-operation-new" + submitCalls = append(submitCalls, submitCall{ + rule: rule, + startTime: startTime, + endTime: endTime, + opId: opId, + }) + return opId, nil + }, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{ + { + OperationId: "retry-operation-789", + State: string(KustoAsyncOperationStateInProgress), + ShouldRetry: 1, + }, + }, nil + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify retry submission + require.Len(t, submitCalls, 1) + require.Equal(t, "retry-operation-new", rule.GetBackfillOperation()) + }) + + t.Run("removes backfill when complete", func(t *testing.T) { + // Backfill already at end time + endTime := baseTime.Add(time.Hour) + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: endTime.Format(time.RFC3339), + End: endTime.Format(time.RFC3339), + }, + }, + } + + task := &SummaryRuleTask{ + Clock: fakeClock, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify backfill was removed + require.Nil(t, rule.Spec.Backfill) + }) + + t.Run("no-op when no backfill configured", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + // No backfill configured + }, + } + + task := &SummaryRuleTask{ + Clock: fakeClock, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + // Should complete without error + }) + + t.Run("handles operation not found in Kusto", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + OperationId: "missing-operation", + }, + }, + } + + task := &SummaryRuleTask{ + Clock: fakeClock, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{}, nil // Operation not found + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify operation ID was cleared + require.Empty(t, rule.GetBackfillOperation()) + }) + + t.Run("with ingestion delay", func(t *testing.T) { + rule := &v1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rule", + Namespace: "default", + }, + Spec: v1.SummaryRuleSpec{ + Database: "TestDB", + Table: "TestTable", + Body: "TestTable | summarize count() by bin(Timestamp, 1h)", + Interval: metav1.Duration{Duration: time.Hour}, + IngestionDelay: &metav1.Duration{Duration: 5 * time.Minute}, + Backfill: &v1.BackfillSpec{ + Start: baseTime.Format(time.RFC3339), + End: baseTime.Add(24 * time.Hour).Format(time.RFC3339), + }, + }, + } + + var submitCalls []submitCall + task := &SummaryRuleTask{ + Clock: fakeClock, + SubmitRule: func(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) { + opId := "backfill-with-delay" + submitCalls = append(submitCalls, submitCall{ + rule: rule, + startTime: startTime, + endTime: endTime, + opId: opId, + }) + return opId, nil + }, + GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { + return []AsyncOperationStatus{}, nil + }, + } + + err := task.handleBackfillExecution(context.Background(), rule) + require.NoError(t, err) + + // Verify time window accounts for ingestion delay + require.Len(t, submitCalls, 1) + expectedStart := baseTime.Add(-5 * time.Minute) + expectedEnd := baseTime.Add(time.Hour - 5*time.Minute).Add(-kustoutil.OneTick) + require.Equal(t, expectedStart.Format(time.RFC3339Nano), submitCalls[0].startTime) + require.Equal(t, expectedEnd.Format(time.RFC3339Nano), submitCalls[0].endTime) + }) +} From 9b76083219cec1abdff7cbb25d0c4a15f880013f Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 17:24:47 +0000 Subject: [PATCH 6/9] refactor: optimize backfill operation status checking and fix progress advancement - Add GetOperation function field to SummaryRuleTask for single-operation queries - Implement parameterized KQL query following databaseExists pattern for injection prevention - Update handleBackfillOperationStatus to use efficient single-operation lookup - Factor out shared calculateTimeWindow method for backfill operations - Add comprehensive comments explaining why NextExecutionWindow cannot share logic - Fix AdvanceBackfillProgress to properly reverse delay calculation - Update all backfill tests to mock new GetOperation function - Maintain backward compatibility with existing execution patterns --- api/v1/summaryrule_types.go | 91 ++++++++++++++++++++++++++++--------- ingestor/adx/tasks.go | 55 +++++++++++++++++----- ingestor/adx/tasks_test.go | 45 +++++++++--------- 3 files changed, 137 insertions(+), 54 deletions(-) diff --git a/api/v1/summaryrule_types.go b/api/v1/summaryrule_types.go index 232cae70d..ae7bdd302 100644 --- a/api/v1/summaryrule_types.go +++ b/api/v1/summaryrule_types.go @@ -317,6 +317,71 @@ func (s *SummaryRule) ShouldSubmitRule(clk clock.Clock) bool { (lastSuccessfulEndTime == nil && clk.Since(cnd.LastTransitionTime.Time) >= s.Spec.Interval.Duration) // First execution timing } +// calculateTimeWindow handles core window calculation logic for backfill operations. +// +// IMPORTANT: This method intentionally does NOT share logic with NextExecutionWindow() +// despite apparent similarities. The methods implement fundamentally different approaches +// to ingestion delay handling: +// +// calculateTimeWindow (this method): +// - Applies ingestion delay AFTER interval alignment +// - Calculates clean interval boundaries first: startPos.truncate(interval) +// - Then shifts the entire window: window.add(-delay) +// - Maintains consistent time boundaries across backfill operations +// +// NextExecutionWindow (normal execution): +// - Applies ingestion delay BEFORE interval alignment +// - Uses delay to determine the alignment point itself +// - Creates execution windows that are "shifted" by the delay amount +// +// These represent two different architectural approaches that cannot be unified without +// breaking existing behavior. See NextExecutionWindow() comment for more details. +func (s *SummaryRule) calculateTimeWindow(clk clock.Clock, startPosition time.Time, maxEndTime *time.Time) (time.Time, time.Time) { + if clk == nil { + clk = clock.RealClock{} + } + + var delay time.Duration + if s.Spec.IngestionDelay != nil { + delay = s.Spec.IngestionDelay.Duration + } + + // Align to interval boundaries like normal execution + intervalDuration := s.Spec.Interval.Duration + windowStartTime := startPosition.UTC().Truncate(intervalDuration) + windowEndTime := windowStartTime.Add(intervalDuration) + + // Apply boundary constraints if provided (don't exceed backfill end time) + if maxEndTime != nil && windowEndTime.After(*maxEndTime) { + windowEndTime = *maxEndTime + } + + // Apply ingestion delay to window + windowStartTime = windowStartTime.Add(-delay) + windowEndTime = windowEndTime.Add(-delay) + + return windowStartTime, windowEndTime +} + +// NextExecutionWindow calculates the time window for normal interval-based execution. +// +// IMPORTANT: This method intentionally does NOT share logic with calculateTimeWindow() +// because they implement fundamentally different approaches to ingestion delay handling: +// +// NextExecutionWindow (this method): +// - Applies ingestion delay BEFORE interval alignment +// - For first execution: (currentTime - delay).truncate(interval) = endTime +// - For subsequent: (lastEndTime - delay).truncate(interval) = startTime +// - This creates execution windows that are "shifted" by the delay amount +// +// calculateTimeWindow (backfill method): +// - Applies ingestion delay AFTER interval alignment +// - Calculates clean interval boundaries first, then shifts the entire window +// - This maintains consistent time window boundaries across backfill operations +// +// These represent two different time calculation strategies that serve different purposes +// and cannot be unified without breaking existing behavior. Any attempt to share this +// logic will break the comprehensive test suite that validates ingestion delay behavior. func (s *SummaryRule) NextExecutionWindow(clk clock.Clock) (windowStartTime time.Time, windowEndTime time.Time) { if clk == nil { clk = clock.RealClock{} @@ -516,25 +581,8 @@ func (s *SummaryRule) GetNextBackfillWindow(clk clock.Clock) (time.Time, time.Ti return time.Time{}, time.Time{}, false } - // Apply ingestion delay - var delay time.Duration - if s.Spec.IngestionDelay != nil { - delay = s.Spec.IngestionDelay.Duration - } - - // Align to interval boundaries like normal execution - intervalDuration := s.Spec.Interval.Duration - windowStartTime := currentStart.UTC().Truncate(intervalDuration) - windowEndTime := windowStartTime.Add(intervalDuration) - - // Don't exceed the backfill end time - if windowEndTime.After(backfillEnd) { - windowEndTime = backfillEnd - } - - // Apply ingestion delay to window - windowStartTime = windowStartTime.Add(-delay) - windowEndTime = windowEndTime.Add(-delay) + // Use shared window calculation logic with backfill end as boundary + windowStartTime, windowEndTime := s.calculateTimeWindow(clk, currentStart, &backfillEnd) // Validate that we have a meaningful window if windowStartTime.After(windowEndTime) || windowStartTime.Equal(windowEndTime) { @@ -550,13 +598,14 @@ func (s *SummaryRule) AdvanceBackfillProgress(endTime time.Time) { return } - // Apply ingestion delay in reverse to get the actual progress position + // The endTime passed in is already delay-adjusted from calculateTimeWindow, + // so we need to reverse the delay to get the actual interval boundary position var delay time.Duration if s.Spec.IngestionDelay != nil { delay = s.Spec.IngestionDelay.Duration } - // Move start position to the end time (plus delay to account for delay applied in window calculation) + // Add delay back to get the clean interval boundary that should be our next start position progressPosition := endTime.Add(delay) s.Spec.Backfill.Start = progressPosition.UTC().Format(time.RFC3339) } diff --git a/ingestor/adx/tasks.go b/ingestor/adx/tasks.go index fa4b60226..12fbe341b 100644 --- a/ingestor/adx/tasks.go +++ b/ingestor/adx/tasks.go @@ -243,6 +243,7 @@ type SummaryRuleTask struct { store storage.CRDHandler kustoCli StatementExecutor GetOperations func(ctx context.Context) ([]AsyncOperationStatus, error) + GetOperation func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) SubmitRule func(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) ClusterLabels map[string]string Clock clock.Clock @@ -257,6 +258,7 @@ func NewSummaryRuleTask(store storage.CRDHandler, kustoCli StatementExecutor, cl } // Set the default implementations task.GetOperations = task.getOperations + task.GetOperation = task.getOperation task.SubmitRule = task.submitRule return task } @@ -466,28 +468,21 @@ func (t *SummaryRuleTask) handleBackfillExecution(ctx context.Context, rule *v1. // handleBackfillOperationStatus checks the status of a running backfill operation func (t *SummaryRuleTask) handleBackfillOperationStatus(ctx context.Context, rule *v1.SummaryRule, operationId string) error { - // Get current Kusto operations to check status - kustoAsyncOperations, err := t.GetOperations(ctx) + // Use the optimized single-operation query + kustoOp, err := t.GetOperation(ctx, operationId) if err != nil { - // If we can't get operations status, we'll retry next time - logger.Warnf("Failed to get async operations for backfill operation %s: %v", operationId, err) + // If we can't get operation status, we'll retry next time + logger.Warnf("Failed to get status for backfill operation %s: %v", operationId, err) return nil } - // Find our operation - index := slices.IndexFunc(kustoAsyncOperations, func(item AsyncOperationStatus) bool { - return item.OperationId == operationId - }) - - if index == -1 { + if kustoOp == nil { // Operation not found, possibly too old or completed outside our window logger.Warnf("Backfill operation %s not found in Kusto operations, clearing operation ID", operationId) rule.ClearBackfillOperation() return nil } - kustoOp := kustoAsyncOperations[index] - // Check if operation is complete if IsKustoAsyncOperationStateCompleted(kustoOp.State) { if kustoOp.State == string(KustoAsyncOperationStateFailed) { @@ -679,6 +674,42 @@ func (t *SummaryRuleTask) getOperations(ctx context.Context) ([]AsyncOperationSt return operations, nil } +func (t *SummaryRuleTask) getOperation(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + // Query for a specific operation using parameters to prevent injection + stmt := kql.New(".show operations | where OperationId == @ParamOperationId | summarize arg_max(LastUpdatedOn, OperationId, State, ShouldRetry, Status) | project LastUpdatedOn, OperationId = tostring(OperationId), State, ShouldRetry = todouble(ShouldRetry), Status") + params := kql.NewParameters().AddString("ParamOperationId", operationId) + + rows, err := t.kustoCli.Mgmt(ctx, stmt, kusto.QueryParameters(params)) + if err != nil { + return nil, fmt.Errorf("failed to retrieve operation %s: %w", operationId, err) + } + defer rows.Stop() + + for { + row, errInline, errFinal := rows.NextRowOrError() + if errFinal == io.EOF { + break + } + if errInline != nil { + continue + } + if errFinal != nil { + return nil, fmt.Errorf("failed to retrieve operation %s: %v", operationId, errFinal) + } + + var status AsyncOperationStatus + if err := row.ToStruct(&status); err != nil { + return nil, fmt.Errorf("failed to parse operation %s: %v", operationId, err) + } + if status.State != "" { + return &status, nil + } + } + + // Operation not found + return nil, nil +} + func operationIDFromResult(iter *kusto.RowIterator) (string, error) { defer iter.Stop() diff --git a/ingestor/adx/tasks_test.go b/ingestor/adx/tasks_test.go index 1100146b9..50f78cbba 100644 --- a/ingestor/adx/tasks_test.go +++ b/ingestor/adx/tasks_test.go @@ -1923,8 +1923,8 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { }) return opId, nil }, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{}, nil + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + return nil, nil // No operation found, should submit new one }, } @@ -1962,13 +1962,14 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { task := &SummaryRuleTask{ Clock: fakeClock, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{ - { + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + if operationId == "test-operation-123" { + return &AsyncOperationStatus{ OperationId: "test-operation-123", State: string(KustoAsyncOperationStateCompleted), - }, - }, nil + }, nil + } + return nil, nil }, } @@ -2004,14 +2005,15 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { task := &SummaryRuleTask{ Clock: fakeClock, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{ - { + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + if operationId == "failed-operation-456" { + return &AsyncOperationStatus{ OperationId: "failed-operation-456", State: string(KustoAsyncOperationStateFailed), Status: "Query execution failed", - }, - }, nil + }, nil + } + return nil, nil }, } @@ -2056,14 +2058,15 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { }) return opId, nil }, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{ - { + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + if operationId == "retry-operation-789" { + return &AsyncOperationStatus{ OperationId: "retry-operation-789", State: string(KustoAsyncOperationStateInProgress), ShouldRetry: 1, - }, - }, nil + }, nil + } + return nil, nil }, } @@ -2151,8 +2154,8 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { task := &SummaryRuleTask{ Clock: fakeClock, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{}, nil // Operation not found + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + return nil, nil // Operation not found }, } @@ -2195,8 +2198,8 @@ func TestSummaryRuleTask_BackfillIntegration(t *testing.T) { }) return opId, nil }, - GetOperations: func(ctx context.Context) ([]AsyncOperationStatus, error) { - return []AsyncOperationStatus{}, nil + GetOperation: func(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { + return nil, nil // No operation found, should submit new one }, } From 55c74742de4acc5d38b85ac244d1074035d91d91 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 17:39:08 +0000 Subject: [PATCH 7/9] docs: add backfill feature documentation and examples - Update CRD reference with comprehensive backfill documentation - Add backfill field to SummaryRule key fields and examples - Create detailed backfill feature section with workflow explanation - Add examples directory with three comprehensive YAML examples: - Basic hourly aggregation with 30-day backfill - Advanced example with ingestion delay and criteria - Large-scale daily migration example - Include comprehensive troubleshooting guide covering: - Common issues and solutions - Progress monitoring techniques - Performance optimization strategies - Advanced troubleshooting commands - Add examples reference to main documentation index - Document limitations, best practices, and monitoring approaches --- docs/crds.md | 72 ++++++- docs/examples/README.md | 49 +++++ docs/examples/backfill-troubleshooting.md | 203 ++++++++++++++++++ .../summaryrule-backfill-advanced.yaml | 51 +++++ docs/examples/summaryrule-backfill-basic.yaml | 36 ++++ .../summaryrule-backfill-migration.yaml | 44 ++++ docs/index.md | 1 + 7 files changed, 455 insertions(+), 1 deletion(-) create mode 100644 docs/examples/README.md create mode 100644 docs/examples/backfill-troubleshooting.md create mode 100644 docs/examples/summaryrule-backfill-advanced.yaml create mode 100644 docs/examples/summaryrule-backfill-basic.yaml create mode 100644 docs/examples/summaryrule-backfill-migration.yaml diff --git a/docs/crds.md b/docs/crds.md index 2e8d123e8..a93b6cb45 100644 --- a/docs/crds.md +++ b/docs/crds.md @@ -8,7 +8,7 @@ This page summarizes all Custom Resource Definitions (CRDs) managed by adx-mon, | Ingestor | Ingests telemetry from collectors, manages WAL, uploads to ADX | image, replicas, endpoint, exposeExternally, adxClusterSelector | [Operator Design](designs/operator.md#ingestor-crd) | | Collector | Collects metrics/logs/traces, forwards to Ingestor | image, ingestorEndpoint | [Operator Design](designs/operator.md#collector-crd) | | Alerter | Runs alert rules, sends notifications | image, notificationEndpoint, adxClusterSelector | [Operator Design](designs/operator.md#alerter-crd) | -| SummaryRule | Automates periodic KQL aggregations with async operation tracking, time window management, cluster label substitutions, and criteria-based execution | database, name, body, table, interval, criteria | [Summary Rules](designs/summary-rules.md#crd) | +| SummaryRule | Automates periodic KQL aggregations with async operation tracking, time window management, cluster label substitutions, criteria-based execution, and historical data backfill | database, name, body, table, interval, criteria, backfill | [Summary Rules](designs/summary-rules.md#crd) | | Function | Defines KQL functions/views for ADX | name, body, database, table, isView, parameters | [Schema ETL](designs/schema-etl.md#crd) | | ManagementCommand | Declarative cluster management commands | command, args, target, schedule | [Management Commands](designs/management-commands.md#crd) | @@ -199,6 +199,26 @@ spec: - production ``` +**Backfill Example for Historical Data Processing:** +```yaml +apiVersion: adx-mon.azure.com/v1 +kind: SummaryRule +metadata: + name: historical-summary +spec: + database: MyDB + name: HistoricalSummary + body: | + MyTable + | where Timestamp between (_startTime .. _endTime) + | summarize count() by bin(Timestamp, 1h) + table: MySummaryTable + interval: 1h + backfill: + start: "2024-01-01T00:00:00Z" + end: "2024-01-31T23:59:59Z" +``` + **Key Fields:** - `database`: Target ADX database. - `name`: Logical name for the rule. @@ -206,6 +226,7 @@ spec: - `table`: Destination table for results. - `interval`: How often to run the summary (e.g., `1h`). - `criteria`: _(Optional)_ Key/value pairs used to determine when a summary rule can execute. If empty or omitted, the rule always executes. Keys and values are deployment-specific and configured on ingestor instances via `--cluster-labels`. For a rule to execute, any one of the criteria must match (OR logic). Matching is case-insensitive. +- `backfill`: _(Optional)_ Historical data processing configuration. When specified, the rule will process historical data from the start datetime to the end datetime in addition to normal interval execution. The system will automatically advance through time windows and remove this field when complete. Limited to 30 days maximum lookback period. **Required Placeholders:** - `_startTime`: Replaced with the start time of the current execution interval as `datetime(...)`. @@ -240,6 +261,55 @@ SummaryRules are managed by the Ingestor's `SummaryRuleTask`, which runs periodi **Error Handling**: Uses `UpdateStatusWithKustoErrorParsing()` to extract meaningful error messages from ADX responses and truncate long errors to 256 characters. +### Backfill Feature for Historical Data Processing + +SummaryRules support backfill operations to process historical data across specified date ranges. This feature enables you to apply summary logic to existing data in ADX. + +**How Backfill Works:** + +1. **Specification**: Add a `backfill` field to your SummaryRule with `start` and `end` datetime values (RFC3339 format). + +2. **Serial Execution**: Backfill processes one time window at a time using the same interval as normal execution, preventing Kusto cluster overload. + +3. **Progress Tracking**: The system automatically advances the `start` position after each successful window completion. You can monitor progress by inspecting the `spec.backfill.start` field. + +4. **Parallel Operation**: Backfill runs alongside normal interval execution - your rule continues processing new data while catching up on historical data. + +5. **Auto-completion**: When backfill reaches the `end` time, the entire `backfill` field is automatically removed from the spec. + +**Backfill Configuration:** + +```yaml +spec: + backfill: + start: "2024-01-01T00:00:00Z" # Current backfill position (auto-updated) + end: "2024-01-31T23:59:59Z" # Target end time (fixed) + operationId: "..." # Current operation ID (auto-managed) +``` + +**Limitations and Best Practices:** + +- **Maximum Lookback**: 30 days maximum between start and end times +- **Time Format**: Use RFC3339 format (`YYYY-MM-DDTHH:MM:SSZ`) +- **Data Overlap**: Users are responsible for handling any data overlap concerns between backfill and normal execution +- **Performance Impact**: Large backfill operations may affect Kusto cluster performance; consider running during off-peak hours +- **Progress Monitoring**: Monitor the `spec.backfill.start` field to track progress +- **Ingestion Delay**: Backfill operations respect the same `ingestionDelay` setting as normal execution + +**Example Backfill Workflow:** + +1. **Initial State**: Rule processes new data normally +2. **Add Backfill**: Update the SummaryRule spec to include backfill configuration +3. **Monitor Progress**: Watch the `start` field advance through time windows +4. **Completion**: Backfill field is automatically removed when complete + +**Troubleshooting:** + +- **Stuck Progress**: Check Kusto operation status using the `operationId` in `spec.backfill.operationId` +- **Failed Operations**: Failed backfill operations will retry automatically by clearing the `operationId` +- **Invalid Dates**: Ensure start < end and both are within the 30-day lookback limit +- **Performance Issues**: Consider reducing the interval size or scheduling backfill during off-peak hours + --- ## Function diff --git a/docs/examples/README.md b/docs/examples/README.md new file mode 100644 index 000000000..85bb03c51 --- /dev/null +++ b/docs/examples/README.md @@ -0,0 +1,49 @@ +# SummaryRule Backfill Examples + +This directory contains examples and documentation for using the SummaryRule backfill feature. + +## Examples + +### Basic Usage +- **[summaryrule-backfill-basic.yaml](summaryrule-backfill-basic.yaml)** - Simple hourly aggregation with 30-day backfill +- **[summaryrule-backfill-advanced.yaml](summaryrule-backfill-advanced.yaml)** - Advanced example with ingestion delay and criteria +- **[summaryrule-backfill-migration.yaml](summaryrule-backfill-migration.yaml)** - Large-scale data migration using daily intervals + +### Documentation +- **[backfill-troubleshooting.md](backfill-troubleshooting.md)** - Comprehensive troubleshooting guide + +## Quick Start + +1. **Apply a basic backfill rule:** + ```bash + kubectl apply -f summaryrule-backfill-basic.yaml + ``` + +2. **Monitor progress:** + ```bash + # Watch the start time advance + kubectl get summaryrule basic-backfill-example -o jsonpath='{.spec.backfill.start}' -w + ``` + +3. **Check completion:** + ```bash + # Backfill is complete when the field is removed + kubectl get summaryrule basic-backfill-example -o jsonpath='{.spec.backfill}' 2>/dev/null || echo "Backfill complete" + ``` + +## Key Features + +- **Parallel Execution**: Backfill runs alongside normal interval execution +- **Automatic Progress**: System advances through time windows automatically +- **Auto-cleanup**: Backfill field is removed when complete +- **30-day Limit**: Maximum lookback period prevents excessive cluster load +- **Serial Processing**: One time window at a time prevents cluster overload + +## Important Notes + +- Backfill operations respect the same `ingestionDelay` as normal execution +- Maximum backfill period is 30 days +- Users are responsible for handling any data overlap concerns +- Monitor cluster performance during large backfill operations + +For detailed information, see the [CRD documentation](../crds.md#backfill-feature-for-historical-data-processing). diff --git a/docs/examples/backfill-troubleshooting.md b/docs/examples/backfill-troubleshooting.md new file mode 100644 index 000000000..0e47993e9 --- /dev/null +++ b/docs/examples/backfill-troubleshooting.md @@ -0,0 +1,203 @@ +# SummaryRule Backfill Troubleshooting Guide + +This guide helps diagnose and resolve common issues with SummaryRule backfill operations. + +## Common Issues and Solutions + +### Backfill Not Starting + +**Symptoms:** +- Backfill field is present but `spec.backfill.start` never advances +- No `operationId` appears in the backfill spec + +**Possible Causes & Solutions:** + +1. **Invalid Date Format** + ```bash + # Check the SummaryRule for validation errors + kubectl describe summaryrule -n + ``` + - Ensure dates are in RFC3339 format: `2024-01-01T00:00:00Z` + - Verify start < end and within 30-day limit + +2. **Criteria Mismatch** + - Check if the rule's criteria match the ingestor's cluster labels + - Verify ingestor is running with appropriate `--cluster-labels` + +3. **Rule Execution Prerequisites** + - Ensure the rule has had at least one normal execution first + - Check that the database and table exist in ADX + +### Backfill Stuck in Progress + +**Symptoms:** +- `spec.backfill.operationId` is present but not changing +- `spec.backfill.start` not advancing + +**Diagnosis Steps:** + +1. **Check Operation Status in ADX** + ```kql + .show operations + | where OperationId == "" + | project State, Status, Details + ``` + +2. **Check Ingestor Logs** + ```bash + kubectl logs -f deployment/ingestor -n adx-mon | grep -i backfill + ``` + +**Solutions:** + +- **Operation Failed**: The operation will retry automatically when the `operationId` is cleared +- **Operation Throttled**: Wait for ADX cluster resources to become available +- **Operation Timeout**: Operations older than 24 hours are automatically cleaned up + +### Backfill Performance Issues + +**Symptoms:** +- ADX cluster experiencing high load +- Query timeouts or throttling errors +- Normal rule execution being affected + +**Mitigation Strategies:** + +1. **Increase Interval Size** + ```yaml + spec: + interval: 1d # Process larger chunks less frequently + ``` + +2. **Add Ingestion Delay** + ```yaml + spec: + ingestionDelay: 1h # Reduce temporal overlap + ``` + +3. **Schedule During Off-Peak Hours** + - Temporarily remove backfill field during peak hours + - Re-add during low-traffic periods + +4. **Optimize KQL Query** + - Add filters to reduce data volume + - Use efficient aggregation functions + - Consider pre-filtering with materialized views + +### Invalid Backfill Configuration + +**Symptoms:** +- Kubernetes validation errors when applying the manifest +- CRD events showing validation failures + +**Common Validation Errors:** + +1. **Date Range Too Large** + ``` + Error: backfill period exceeds maximum 30-day limit + ``` + - Reduce the time span between start and end dates + +2. **Start After End** + ``` + Error: backfill start must be before end + ``` + - Verify the start date is earlier than the end date + +3. **Missing Required Fields** + ``` + Error: backfill start and end are required when backfill is specified + ``` + - Ensure both `start` and `end` fields are provided + +### Monitoring Backfill Progress + +**Progress Tracking:** +```bash +# Watch backfill progress +kubectl get summaryrule -o jsonpath='{.spec.backfill.start}' -w + +# Check current operation +kubectl get summaryrule -o jsonpath='{.spec.backfill.operationId}' +``` + +**Completion Detection:** +```bash +# Backfill is complete when the field is removed +kubectl get summaryrule -o jsonpath='{.spec.backfill}' || echo "Backfill complete" +``` + +**ADX Operation Monitoring:** +```kql +// Check recent backfill operations +.show operations +| where CommandType == "DataIngestPull" +| where Text contains "" +| project StartedOn, CompletedOn, State, OperationId +| order by StartedOn desc +``` + +## Best Practices for Reliable Backfill + +### 1. Plan Your Backfill Strategy +- Start with smaller time ranges to test performance +- Consider the impact on cluster resources +- Schedule backfill during off-peak hours + +### 2. Monitor Resource Usage +- Watch ADX cluster CPU and memory usage +- Monitor query performance and timeouts +- Check for throttling in ADX metrics + +### 3. Optimize for Performance +- Use appropriate interval sizes (larger intervals = fewer operations) +- Add filters to reduce data volume +- Consider ingestion delays to avoid temporal overlap + +### 4. Handle Failures Gracefully +- Failed operations will retry automatically +- Monitor logs for persistent failures +- Consider adjusting query complexity if timeouts occur + +### 5. Test Before Production +- Test backfill on development clusters first +- Validate query performance with sample data +- Verify time window calculations with small ranges + +## Advanced Troubleshooting + +### Manual Operation Cleanup +If an operation gets stuck and doesn't clear automatically: + +```bash +# Edit the SummaryRule to clear the operationId +kubectl patch summaryrule --type='json' \ + -p='[{"op": "remove", "path": "/spec/backfill/operationId"}]' +``` + +### Force Backfill Completion +To manually complete a backfill: + +```bash +# Remove the entire backfill field +kubectl patch summaryrule --type='json' \ + -p='[{"op": "remove", "path": "/spec/backfill"}]' +``` + +### Reset Backfill Progress +To restart backfill from a specific point: + +```bash +# Update the start time +kubectl patch summaryrule --type='json' \ + -p='[{"op": "replace", "path": "/spec/backfill/start", "value": "2024-01-15T00:00:00Z"}]' +``` + +## Getting Help + +If you encounter issues not covered in this guide: + +1. Check the [SummaryRule documentation](../crds.md#summaryrule) +2. Review ingestor logs for detailed error messages +3. Examine ADX operation details for query-specific issues +4. Consider reducing backfill scope or optimizing queries diff --git a/docs/examples/summaryrule-backfill-advanced.yaml b/docs/examples/summaryrule-backfill-advanced.yaml new file mode 100644 index 000000000..b6931a4fb --- /dev/null +++ b/docs/examples/summaryrule-backfill-advanced.yaml @@ -0,0 +1,51 @@ +# Advanced SummaryRule with Backfill and Ingestion Delay +# +# This example demonstrates a more complex backfill scenario with: +# - 15-minute ingestion delay to account for data latency +# - Shorter interval for more granular processing +# - Complex aggregation logic +# - Regional data processing + +apiVersion: adx-mon.azure.com/v1 +kind: SummaryRule +metadata: + name: advanced-backfill-example + namespace: adx-mon +spec: + database: TelemetryDB + table: RegionalPerformanceSummary + body: | + PerformanceMetrics + | where Timestamp between (_startTime .. _endTime) + | where Region == "_region" + | extend ResponseTimeMs = todecimal(ResponseTime) + | summarize + p50_response_time = percentile(ResponseTimeMs, 50), + p95_response_time = percentile(ResponseTimeMs, 95), + p99_response_time = percentile(ResponseTimeMs, 99), + total_requests = count(), + error_rate = countif(Status >= 400) * 100.0 / count() + by bin(Timestamp, 15m), Service, Region + interval: 15m + ingestionDelay: 15m + + # Process 7 days of historical data with ingestion delay + backfill: + start: "2024-02-01T00:00:00Z" + end: "2024-02-07T23:59:59Z" + + # Only run on production clusters in specific regions + criteria: + environment: + - production + region: + - eastus + - westus + +--- +# Notes: +# - The 15-minute ingestion delay ensures data is fully ingested before processing +# - Backfill will process data in 15-minute windows, respecting the same delay +# - The rule only executes on clusters labeled with production environment +# - Cluster label substitution replaces "_region" with the actual region value +# - Progress can be monitored by checking spec.backfill.start field diff --git a/docs/examples/summaryrule-backfill-basic.yaml b/docs/examples/summaryrule-backfill-basic.yaml new file mode 100644 index 000000000..a2769478c --- /dev/null +++ b/docs/examples/summaryrule-backfill-basic.yaml @@ -0,0 +1,36 @@ +# Basic SummaryRule with Backfill Example +# +# This example shows a basic hourly aggregation rule that processes +# historical data from January 1-31, 2024, while continuing to process +# new data in real-time. + +apiVersion: adx-mon.azure.com/v1 +kind: SummaryRule +metadata: + name: basic-backfill-example + namespace: adx-mon +spec: + database: Metrics + table: HourlyMetricSummary + body: | + RawMetrics + | where Timestamp between (_startTime .. _endTime) + | summarize + avg_value = avg(Value), + max_value = max(Value), + min_value = min(Value), + count = count() + by bin(Timestamp, 1h), MetricName + interval: 1h + + # Backfill configuration for historical data + backfill: + start: "2024-01-01T00:00:00Z" + end: "2024-01-31T23:59:59Z" + +--- +# The system will: +# 1. Continue normal hourly processing of new data +# 2. Process historical data one hour at a time from Jan 1-31, 2024 +# 3. Automatically advance the 'start' time after each successful window +# 4. Remove the entire 'backfill' field when complete diff --git a/docs/examples/summaryrule-backfill-migration.yaml b/docs/examples/summaryrule-backfill-migration.yaml new file mode 100644 index 000000000..5fda82641 --- /dev/null +++ b/docs/examples/summaryrule-backfill-migration.yaml @@ -0,0 +1,44 @@ +# Large-Scale Data Migration with Backfill +# +# This example shows how to use backfill for migrating or reprocessing +# large amounts of historical data with daily intervals to reduce +# Kusto cluster load. + +apiVersion: adx-mon.azure.com/v1 +kind: SummaryRule +metadata: + name: data-migration-backfill + namespace: adx-mon +spec: + database: AnalyticsDB + table: DailyBusinessMetrics + body: | + RawBusinessEvents + | where EventTime between (_startTime .. _endTime) + | where EventType in ("purchase", "signup", "cancellation") + | extend Date = bin(EventTime, 1d) + | summarize + daily_purchases = countif(EventType == "purchase"), + daily_signups = countif(EventType == "signup"), + daily_cancellations = countif(EventType == "cancellation"), + total_revenue = sumif(Revenue, EventType == "purchase"), + unique_customers = dcount(CustomerId) + by Date, ProductCategory + interval: 1d + + # Process 30 days (maximum allowed) of historical business data + backfill: + start: "2024-01-01T00:00:00Z" + end: "2024-01-30T23:59:59Z" + +--- +# Best Practices Demonstrated: +# 1. Using daily intervals reduces cluster load for large datasets +# 2. Maximum 30-day backfill period (enforced by validation) +# 3. Complex business logic aggregation suitable for batch processing +# 4. Clear table naming convention for the aggregated results +# +# Monitoring: +# - Check spec.backfill.start to see current progress (advances daily) +# - Check spec.backfill.operationId for current Kusto operation +# - Monitor cluster performance during backfill execution diff --git a/docs/index.md b/docs/index.md index 6b81857a4..ca50ca8fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -23,3 +23,4 @@ collection and analysis of observability data. * [Concepts](concepts.md) * [CRD Reference](crds.md) * [Cook Book](cookbook.md) +* [Examples](examples/) - YAML examples and guides From c423475469bab99a961612fbc53e4ba4647e2721 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Fri, 1 Aug 2025 19:49:59 +0000 Subject: [PATCH 8/9] Address PR review comments: improve code quality and reduce duplication - Format long KQL queries using multi-line strings for better readability - Add comprehensive documentation to helper functions with actual value examples - Decompose large functions into single-responsibility helpers - Extract shared utilities (ensureClockIsSet, getIngestionDelay, capWindowEndToCurrentTime) - Maintain architectural separation between backfill and normal execution time calculations - All tests passing, no behavioral changes --- api/v1/summaryrule_types.go | 82 ++++++++++++++++++++++--------------- ingestor/adx/tasks.go | 50 +++++++++++++++++----- 2 files changed, 87 insertions(+), 45 deletions(-) diff --git a/api/v1/summaryrule_types.go b/api/v1/summaryrule_types.go index ae7bdd302..65179d77f 100644 --- a/api/v1/summaryrule_types.go +++ b/api/v1/summaryrule_types.go @@ -39,7 +39,7 @@ const ( // the status of an async operation. This value is somewhat arbitrary, but the intent // is to not overwhelm the service with requests. SummaryRuleAsyncOperationPollInterval = 10 * time.Minute - // SummaryRuleMaxBackfillLookback is the maximum allowed lookback period for backfill operations + // SummaryRuleMaxBackfillLookback is the maximum allowed lookback period for backfill operations (30 days) SummaryRuleMaxBackfillLookback = 30 * 24 * time.Hour // 30 days ) @@ -294,9 +294,7 @@ func (s *SummaryRule) RemoveAsyncOperation(operationId string) { } func (s *SummaryRule) ShouldSubmitRule(clk clock.Clock) bool { - if clk == nil { - clk = clock.RealClock{} - } + clk = s.ensureClockIsSet(clk) lastSuccessfulEndTime := s.GetLastExecutionTime() cnd := s.GetCondition() @@ -337,30 +335,59 @@ func (s *SummaryRule) ShouldSubmitRule(clk clock.Clock) bool { // These represent two different architectural approaches that cannot be unified without // breaking existing behavior. See NextExecutionWindow() comment for more details. func (s *SummaryRule) calculateTimeWindow(clk clock.Clock, startPosition time.Time, maxEndTime *time.Time) (time.Time, time.Time) { - if clk == nil { - clk = clock.RealClock{} - } + clk = s.ensureClockIsSet(clk) - var delay time.Duration + delay := s.getIngestionDelay() + windowStartTime, windowEndTime := s.calculateIntervalBoundaries(startPosition) + windowEndTime = s.applyBoundaryConstraints(windowEndTime, maxEndTime) + + return s.applyIngestionDelayToWindow(windowStartTime, windowEndTime, delay) +} + +// getIngestionDelay returns the configured ingestion delay duration, or zero if not set +func (s *SummaryRule) getIngestionDelay() time.Duration { if s.Spec.IngestionDelay != nil { - delay = s.Spec.IngestionDelay.Duration + return s.Spec.IngestionDelay.Duration } + return 0 +} - // Align to interval boundaries like normal execution +// calculateIntervalBoundaries aligns the start position to interval boundaries +func (s *SummaryRule) calculateIntervalBoundaries(startPosition time.Time) (time.Time, time.Time) { intervalDuration := s.Spec.Interval.Duration windowStartTime := startPosition.UTC().Truncate(intervalDuration) windowEndTime := windowStartTime.Add(intervalDuration) + return windowStartTime, windowEndTime +} - // Apply boundary constraints if provided (don't exceed backfill end time) +// applyBoundaryConstraints ensures the window end doesn't exceed the maximum end time +func (s *SummaryRule) applyBoundaryConstraints(windowEndTime time.Time, maxEndTime *time.Time) time.Time { if maxEndTime != nil && windowEndTime.After(*maxEndTime) { - windowEndTime = *maxEndTime + return *maxEndTime } + return windowEndTime +} - // Apply ingestion delay to window - windowStartTime = windowStartTime.Add(-delay) - windowEndTime = windowEndTime.Add(-delay) +// applyIngestionDelayToWindow shifts both start and end times by the ingestion delay +func (s *SummaryRule) applyIngestionDelayToWindow(windowStartTime, windowEndTime time.Time, delay time.Duration) (time.Time, time.Time) { + return windowStartTime.Add(-delay), windowEndTime.Add(-delay) +} - return windowStartTime, windowEndTime +// ensureClockIsSet returns a real clock if the provided clock is nil +func (s *SummaryRule) ensureClockIsSet(clk clock.Clock) clock.Clock { + if clk == nil { + return clock.RealClock{} + } + return clk +} + +// capWindowEndToCurrentTime ensures the window end doesn't exceed current time (for normal execution) +func (s *SummaryRule) capWindowEndToCurrentTime(clk clock.Clock, windowEndTime time.Time, delay time.Duration) time.Time { + now := clk.Now().UTC().Add(-delay).Truncate(time.Minute) + if windowEndTime.After(now) { + return now + } + return windowEndTime } // NextExecutionWindow calculates the time window for normal interval-based execution. @@ -383,14 +410,8 @@ func (s *SummaryRule) calculateTimeWindow(clk clock.Clock, startPosition time.Ti // and cannot be unified without breaking existing behavior. Any attempt to share this // logic will break the comprehensive test suite that validates ingestion delay behavior. func (s *SummaryRule) NextExecutionWindow(clk clock.Clock) (windowStartTime time.Time, windowEndTime time.Time) { - if clk == nil { - clk = clock.RealClock{} - } - - var delay time.Duration - if s.Spec.IngestionDelay != nil { - delay = s.Spec.IngestionDelay.Duration - } + clk = s.ensureClockIsSet(clk) + delay := s.getIngestionDelay() lastSuccessfulEndTime := s.GetLastExecutionTime() if lastSuccessfulEndTime == nil { @@ -406,18 +427,13 @@ func (s *SummaryRule) NextExecutionWindow(clk clock.Clock) (windowStartTime time windowEndTime = windowStartTime.Add(s.Spec.Interval.Duration) // Ensure we don't execute future windows - now := clk.Now().UTC().Add(-delay).Truncate(time.Minute) - if windowEndTime.After(now) { - windowEndTime = now - } + windowEndTime = s.capWindowEndToCurrentTime(clk, windowEndTime, delay) } return } func (s *SummaryRule) BackfillAsyncOperations(clk clock.Clock) { - if clk == nil { - clk = clock.RealClock{} - } + clk = s.ensureClockIsSet(clk) // Get the last execution time as our starting point lastExecutionTime := s.GetLastExecutionTime() @@ -561,9 +577,7 @@ func (s *SummaryRule) IsBackfillComplete() bool { // Returns start time, end time, and whether a valid window exists // Uses the same interval alignment as normal execution func (s *SummaryRule) GetNextBackfillWindow(clk clock.Clock) (time.Time, time.Time, bool) { - if clk == nil { - clk = clock.RealClock{} - } + clk = s.ensureClockIsSet(clk) if !s.HasActiveBackfill() || s.IsBackfillComplete() { return time.Time{}, time.Time{}, false diff --git a/ingestor/adx/tasks.go b/ingestor/adx/tasks.go index 12fbe341b..2e0262b85 100644 --- a/ingestor/adx/tasks.go +++ b/ingestor/adx/tasks.go @@ -519,30 +519,53 @@ func (t *SummaryRuleTask) handleBackfillOperationStatus(ctx context.Context, rul // submitNextBackfillWindow submits the next backfill window for execution func (t *SummaryRuleTask) submitNextBackfillWindow(ctx context.Context, rule *v1.SummaryRule) error { - windowStartTime, windowEndTime, ok := rule.GetNextBackfillWindow(t.Clock) + windowStartTime, windowEndTime, ok := t.getNextBackfillWindow(rule) if !ok { - // No valid window, backfill might be complete - if rule.IsBackfillComplete() { - logger.Infof("Backfill complete for rule %s.%s, removing backfill configuration", rule.Spec.Database, rule.Name) - rule.RemoveBackfill() - } - return nil + return t.handleBackfillCompletion(rule) + } + + operationId, err := t.submitBackfillOperation(ctx, rule, windowStartTime, windowEndTime) + if err != nil { + return err + } + + t.recordBackfillOperationSubmission(rule, operationId, windowStartTime, windowEndTime) + return nil +} + +// getNextBackfillWindow retrieves the next backfill window and handles completion checking +func (t *SummaryRuleTask) getNextBackfillWindow(rule *v1.SummaryRule) (time.Time, time.Time, bool) { + return rule.GetNextBackfillWindow(t.Clock) +} + +// handleBackfillCompletion manages backfill completion when no valid window exists +func (t *SummaryRuleTask) handleBackfillCompletion(rule *v1.SummaryRule) error { + if rule.IsBackfillComplete() { + logger.Infof("Backfill complete for rule %s.%s, removing backfill configuration", rule.Spec.Database, rule.Name) + rule.RemoveBackfill() } + return nil +} +// submitBackfillOperation submits the backfill operation with proper time formatting +func (t *SummaryRuleTask) submitBackfillOperation(ctx context.Context, rule *v1.SummaryRule, windowStartTime, windowEndTime time.Time) (string, error) { // Subtract 1 tick from endTime for the query to avoid boundary issues queryEndTime := windowEndTime.Add(-kustoutil.OneTick) operationId, err := t.SubmitRule(ctx, *rule, windowStartTime.Format(time.RFC3339Nano), queryEndTime.Format(time.RFC3339Nano)) if err != nil { - return fmt.Errorf("failed to submit backfill operation for rule %s.%s (window %v to %v): %w", + return "", fmt.Errorf("failed to submit backfill operation for rule %s.%s (window %v to %v): %w", rule.Spec.Database, rule.Name, windowStartTime, windowEndTime, err) } + return operationId, nil +} + +// recordBackfillOperationSubmission logs the submission and updates the rule state +func (t *SummaryRuleTask) recordBackfillOperationSubmission(rule *v1.SummaryRule, operationId string, windowStartTime, windowEndTime time.Time) { rule.SetBackfillOperation(operationId) logger.Infof("Submitted backfill operation %s for rule %s.%s (window %v to %v)", operationId, rule.Spec.Database, rule.Name, windowStartTime, windowEndTime) - - return nil } func (t *SummaryRuleTask) trackAsyncOperations(ctx context.Context, rule *v1.SummaryRule, kustoAsyncOperations []AsyncOperationStatus) { @@ -676,7 +699,12 @@ func (t *SummaryRuleTask) getOperations(ctx context.Context) ([]AsyncOperationSt func (t *SummaryRuleTask) getOperation(ctx context.Context, operationId string) (*AsyncOperationStatus, error) { // Query for a specific operation using parameters to prevent injection - stmt := kql.New(".show operations | where OperationId == @ParamOperationId | summarize arg_max(LastUpdatedOn, OperationId, State, ShouldRetry, Status) | project LastUpdatedOn, OperationId = tostring(OperationId), State, ShouldRetry = todouble(ShouldRetry), Status") + stmt := kql.New(` +.show operations +| where OperationId == @ParamOperationId +| summarize arg_max(LastUpdatedOn, OperationId, State, ShouldRetry, Status) +| project LastUpdatedOn, OperationId = tostring(OperationId), State, ShouldRetry = todouble(ShouldRetry), Status +`) params := kql.NewParameters().AddString("ParamOperationId", operationId) rows, err := t.kustoCli.Mgmt(ctx, stmt, kusto.QueryParameters(params)) From 29a86f619b07f685a333c93f7283f8418d322a46 Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Mon, 4 Aug 2025 13:37:47 +0000 Subject: [PATCH 9/9] Remove examples --- docs/examples/README.md | 49 ----- docs/examples/backfill-troubleshooting.md | 203 ------------------ .../summaryrule-backfill-advanced.yaml | 51 ----- docs/examples/summaryrule-backfill-basic.yaml | 36 ---- .../summaryrule-backfill-migration.yaml | 44 ---- docs/index.md | 1 - ingestor/adx/tasks.go | 7 +- 7 files changed, 1 insertion(+), 390 deletions(-) delete mode 100644 docs/examples/README.md delete mode 100644 docs/examples/backfill-troubleshooting.md delete mode 100644 docs/examples/summaryrule-backfill-advanced.yaml delete mode 100644 docs/examples/summaryrule-backfill-basic.yaml delete mode 100644 docs/examples/summaryrule-backfill-migration.yaml diff --git a/docs/examples/README.md b/docs/examples/README.md deleted file mode 100644 index 85bb03c51..000000000 --- a/docs/examples/README.md +++ /dev/null @@ -1,49 +0,0 @@ -# SummaryRule Backfill Examples - -This directory contains examples and documentation for using the SummaryRule backfill feature. - -## Examples - -### Basic Usage -- **[summaryrule-backfill-basic.yaml](summaryrule-backfill-basic.yaml)** - Simple hourly aggregation with 30-day backfill -- **[summaryrule-backfill-advanced.yaml](summaryrule-backfill-advanced.yaml)** - Advanced example with ingestion delay and criteria -- **[summaryrule-backfill-migration.yaml](summaryrule-backfill-migration.yaml)** - Large-scale data migration using daily intervals - -### Documentation -- **[backfill-troubleshooting.md](backfill-troubleshooting.md)** - Comprehensive troubleshooting guide - -## Quick Start - -1. **Apply a basic backfill rule:** - ```bash - kubectl apply -f summaryrule-backfill-basic.yaml - ``` - -2. **Monitor progress:** - ```bash - # Watch the start time advance - kubectl get summaryrule basic-backfill-example -o jsonpath='{.spec.backfill.start}' -w - ``` - -3. **Check completion:** - ```bash - # Backfill is complete when the field is removed - kubectl get summaryrule basic-backfill-example -o jsonpath='{.spec.backfill}' 2>/dev/null || echo "Backfill complete" - ``` - -## Key Features - -- **Parallel Execution**: Backfill runs alongside normal interval execution -- **Automatic Progress**: System advances through time windows automatically -- **Auto-cleanup**: Backfill field is removed when complete -- **30-day Limit**: Maximum lookback period prevents excessive cluster load -- **Serial Processing**: One time window at a time prevents cluster overload - -## Important Notes - -- Backfill operations respect the same `ingestionDelay` as normal execution -- Maximum backfill period is 30 days -- Users are responsible for handling any data overlap concerns -- Monitor cluster performance during large backfill operations - -For detailed information, see the [CRD documentation](../crds.md#backfill-feature-for-historical-data-processing). diff --git a/docs/examples/backfill-troubleshooting.md b/docs/examples/backfill-troubleshooting.md deleted file mode 100644 index 0e47993e9..000000000 --- a/docs/examples/backfill-troubleshooting.md +++ /dev/null @@ -1,203 +0,0 @@ -# SummaryRule Backfill Troubleshooting Guide - -This guide helps diagnose and resolve common issues with SummaryRule backfill operations. - -## Common Issues and Solutions - -### Backfill Not Starting - -**Symptoms:** -- Backfill field is present but `spec.backfill.start` never advances -- No `operationId` appears in the backfill spec - -**Possible Causes & Solutions:** - -1. **Invalid Date Format** - ```bash - # Check the SummaryRule for validation errors - kubectl describe summaryrule -n - ``` - - Ensure dates are in RFC3339 format: `2024-01-01T00:00:00Z` - - Verify start < end and within 30-day limit - -2. **Criteria Mismatch** - - Check if the rule's criteria match the ingestor's cluster labels - - Verify ingestor is running with appropriate `--cluster-labels` - -3. **Rule Execution Prerequisites** - - Ensure the rule has had at least one normal execution first - - Check that the database and table exist in ADX - -### Backfill Stuck in Progress - -**Symptoms:** -- `spec.backfill.operationId` is present but not changing -- `spec.backfill.start` not advancing - -**Diagnosis Steps:** - -1. **Check Operation Status in ADX** - ```kql - .show operations - | where OperationId == "" - | project State, Status, Details - ``` - -2. **Check Ingestor Logs** - ```bash - kubectl logs -f deployment/ingestor -n adx-mon | grep -i backfill - ``` - -**Solutions:** - -- **Operation Failed**: The operation will retry automatically when the `operationId` is cleared -- **Operation Throttled**: Wait for ADX cluster resources to become available -- **Operation Timeout**: Operations older than 24 hours are automatically cleaned up - -### Backfill Performance Issues - -**Symptoms:** -- ADX cluster experiencing high load -- Query timeouts or throttling errors -- Normal rule execution being affected - -**Mitigation Strategies:** - -1. **Increase Interval Size** - ```yaml - spec: - interval: 1d # Process larger chunks less frequently - ``` - -2. **Add Ingestion Delay** - ```yaml - spec: - ingestionDelay: 1h # Reduce temporal overlap - ``` - -3. **Schedule During Off-Peak Hours** - - Temporarily remove backfill field during peak hours - - Re-add during low-traffic periods - -4. **Optimize KQL Query** - - Add filters to reduce data volume - - Use efficient aggregation functions - - Consider pre-filtering with materialized views - -### Invalid Backfill Configuration - -**Symptoms:** -- Kubernetes validation errors when applying the manifest -- CRD events showing validation failures - -**Common Validation Errors:** - -1. **Date Range Too Large** - ``` - Error: backfill period exceeds maximum 30-day limit - ``` - - Reduce the time span between start and end dates - -2. **Start After End** - ``` - Error: backfill start must be before end - ``` - - Verify the start date is earlier than the end date - -3. **Missing Required Fields** - ``` - Error: backfill start and end are required when backfill is specified - ``` - - Ensure both `start` and `end` fields are provided - -### Monitoring Backfill Progress - -**Progress Tracking:** -```bash -# Watch backfill progress -kubectl get summaryrule -o jsonpath='{.spec.backfill.start}' -w - -# Check current operation -kubectl get summaryrule -o jsonpath='{.spec.backfill.operationId}' -``` - -**Completion Detection:** -```bash -# Backfill is complete when the field is removed -kubectl get summaryrule -o jsonpath='{.spec.backfill}' || echo "Backfill complete" -``` - -**ADX Operation Monitoring:** -```kql -// Check recent backfill operations -.show operations -| where CommandType == "DataIngestPull" -| where Text contains "" -| project StartedOn, CompletedOn, State, OperationId -| order by StartedOn desc -``` - -## Best Practices for Reliable Backfill - -### 1. Plan Your Backfill Strategy -- Start with smaller time ranges to test performance -- Consider the impact on cluster resources -- Schedule backfill during off-peak hours - -### 2. Monitor Resource Usage -- Watch ADX cluster CPU and memory usage -- Monitor query performance and timeouts -- Check for throttling in ADX metrics - -### 3. Optimize for Performance -- Use appropriate interval sizes (larger intervals = fewer operations) -- Add filters to reduce data volume -- Consider ingestion delays to avoid temporal overlap - -### 4. Handle Failures Gracefully -- Failed operations will retry automatically -- Monitor logs for persistent failures -- Consider adjusting query complexity if timeouts occur - -### 5. Test Before Production -- Test backfill on development clusters first -- Validate query performance with sample data -- Verify time window calculations with small ranges - -## Advanced Troubleshooting - -### Manual Operation Cleanup -If an operation gets stuck and doesn't clear automatically: - -```bash -# Edit the SummaryRule to clear the operationId -kubectl patch summaryrule --type='json' \ - -p='[{"op": "remove", "path": "/spec/backfill/operationId"}]' -``` - -### Force Backfill Completion -To manually complete a backfill: - -```bash -# Remove the entire backfill field -kubectl patch summaryrule --type='json' \ - -p='[{"op": "remove", "path": "/spec/backfill"}]' -``` - -### Reset Backfill Progress -To restart backfill from a specific point: - -```bash -# Update the start time -kubectl patch summaryrule --type='json' \ - -p='[{"op": "replace", "path": "/spec/backfill/start", "value": "2024-01-15T00:00:00Z"}]' -``` - -## Getting Help - -If you encounter issues not covered in this guide: - -1. Check the [SummaryRule documentation](../crds.md#summaryrule) -2. Review ingestor logs for detailed error messages -3. Examine ADX operation details for query-specific issues -4. Consider reducing backfill scope or optimizing queries diff --git a/docs/examples/summaryrule-backfill-advanced.yaml b/docs/examples/summaryrule-backfill-advanced.yaml deleted file mode 100644 index b6931a4fb..000000000 --- a/docs/examples/summaryrule-backfill-advanced.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# Advanced SummaryRule with Backfill and Ingestion Delay -# -# This example demonstrates a more complex backfill scenario with: -# - 15-minute ingestion delay to account for data latency -# - Shorter interval for more granular processing -# - Complex aggregation logic -# - Regional data processing - -apiVersion: adx-mon.azure.com/v1 -kind: SummaryRule -metadata: - name: advanced-backfill-example - namespace: adx-mon -spec: - database: TelemetryDB - table: RegionalPerformanceSummary - body: | - PerformanceMetrics - | where Timestamp between (_startTime .. _endTime) - | where Region == "_region" - | extend ResponseTimeMs = todecimal(ResponseTime) - | summarize - p50_response_time = percentile(ResponseTimeMs, 50), - p95_response_time = percentile(ResponseTimeMs, 95), - p99_response_time = percentile(ResponseTimeMs, 99), - total_requests = count(), - error_rate = countif(Status >= 400) * 100.0 / count() - by bin(Timestamp, 15m), Service, Region - interval: 15m - ingestionDelay: 15m - - # Process 7 days of historical data with ingestion delay - backfill: - start: "2024-02-01T00:00:00Z" - end: "2024-02-07T23:59:59Z" - - # Only run on production clusters in specific regions - criteria: - environment: - - production - region: - - eastus - - westus - ---- -# Notes: -# - The 15-minute ingestion delay ensures data is fully ingested before processing -# - Backfill will process data in 15-minute windows, respecting the same delay -# - The rule only executes on clusters labeled with production environment -# - Cluster label substitution replaces "_region" with the actual region value -# - Progress can be monitored by checking spec.backfill.start field diff --git a/docs/examples/summaryrule-backfill-basic.yaml b/docs/examples/summaryrule-backfill-basic.yaml deleted file mode 100644 index a2769478c..000000000 --- a/docs/examples/summaryrule-backfill-basic.yaml +++ /dev/null @@ -1,36 +0,0 @@ -# Basic SummaryRule with Backfill Example -# -# This example shows a basic hourly aggregation rule that processes -# historical data from January 1-31, 2024, while continuing to process -# new data in real-time. - -apiVersion: adx-mon.azure.com/v1 -kind: SummaryRule -metadata: - name: basic-backfill-example - namespace: adx-mon -spec: - database: Metrics - table: HourlyMetricSummary - body: | - RawMetrics - | where Timestamp between (_startTime .. _endTime) - | summarize - avg_value = avg(Value), - max_value = max(Value), - min_value = min(Value), - count = count() - by bin(Timestamp, 1h), MetricName - interval: 1h - - # Backfill configuration for historical data - backfill: - start: "2024-01-01T00:00:00Z" - end: "2024-01-31T23:59:59Z" - ---- -# The system will: -# 1. Continue normal hourly processing of new data -# 2. Process historical data one hour at a time from Jan 1-31, 2024 -# 3. Automatically advance the 'start' time after each successful window -# 4. Remove the entire 'backfill' field when complete diff --git a/docs/examples/summaryrule-backfill-migration.yaml b/docs/examples/summaryrule-backfill-migration.yaml deleted file mode 100644 index 5fda82641..000000000 --- a/docs/examples/summaryrule-backfill-migration.yaml +++ /dev/null @@ -1,44 +0,0 @@ -# Large-Scale Data Migration with Backfill -# -# This example shows how to use backfill for migrating or reprocessing -# large amounts of historical data with daily intervals to reduce -# Kusto cluster load. - -apiVersion: adx-mon.azure.com/v1 -kind: SummaryRule -metadata: - name: data-migration-backfill - namespace: adx-mon -spec: - database: AnalyticsDB - table: DailyBusinessMetrics - body: | - RawBusinessEvents - | where EventTime between (_startTime .. _endTime) - | where EventType in ("purchase", "signup", "cancellation") - | extend Date = bin(EventTime, 1d) - | summarize - daily_purchases = countif(EventType == "purchase"), - daily_signups = countif(EventType == "signup"), - daily_cancellations = countif(EventType == "cancellation"), - total_revenue = sumif(Revenue, EventType == "purchase"), - unique_customers = dcount(CustomerId) - by Date, ProductCategory - interval: 1d - - # Process 30 days (maximum allowed) of historical business data - backfill: - start: "2024-01-01T00:00:00Z" - end: "2024-01-30T23:59:59Z" - ---- -# Best Practices Demonstrated: -# 1. Using daily intervals reduces cluster load for large datasets -# 2. Maximum 30-day backfill period (enforced by validation) -# 3. Complex business logic aggregation suitable for batch processing -# 4. Clear table naming convention for the aggregated results -# -# Monitoring: -# - Check spec.backfill.start to see current progress (advances daily) -# - Check spec.backfill.operationId for current Kusto operation -# - Monitor cluster performance during backfill execution diff --git a/docs/index.md b/docs/index.md index ca50ca8fa..6b81857a4 100644 --- a/docs/index.md +++ b/docs/index.md @@ -23,4 +23,3 @@ collection and analysis of observability data. * [Concepts](concepts.md) * [CRD Reference](crds.md) * [Cook Book](cookbook.md) -* [Examples](examples/) - YAML examples and guides diff --git a/ingestor/adx/tasks.go b/ingestor/adx/tasks.go index 2e0262b85..719823952 100644 --- a/ingestor/adx/tasks.go +++ b/ingestor/adx/tasks.go @@ -519,7 +519,7 @@ func (t *SummaryRuleTask) handleBackfillOperationStatus(ctx context.Context, rul // submitNextBackfillWindow submits the next backfill window for execution func (t *SummaryRuleTask) submitNextBackfillWindow(ctx context.Context, rule *v1.SummaryRule) error { - windowStartTime, windowEndTime, ok := t.getNextBackfillWindow(rule) + windowStartTime, windowEndTime, ok := rule.GetNextBackfillWindow(t.Clock) if !ok { return t.handleBackfillCompletion(rule) } @@ -533,11 +533,6 @@ func (t *SummaryRuleTask) submitNextBackfillWindow(ctx context.Context, rule *v1 return nil } -// getNextBackfillWindow retrieves the next backfill window and handles completion checking -func (t *SummaryRuleTask) getNextBackfillWindow(rule *v1.SummaryRule) (time.Time, time.Time, bool) { - return rule.GetNextBackfillWindow(t.Clock) -} - // handleBackfillCompletion manages backfill completion when no valid window exists func (t *SummaryRuleTask) handleBackfillCompletion(rule *v1.SummaryRule) error { if rule.IsBackfillComplete() {