Skip to content

Commit b63b869

Browse files
feat(live_agg): Enable pre-aggregation flow
1 parent 6ab2e39 commit b63b869

File tree

4 files changed

+23
-59
lines changed

4 files changed

+23
-59
lines changed

events-processor/processors/event_processors/enrichment_service.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ func (s *EventEnrichmentService) enrichWithSubscription(enrichedEvent *models.En
110110
}
111111

112112
func (s *EventEnrichmentService) enrichWithChargeInfo(enrichedEvent *models.EnrichedEvent) utils.Result[[]*models.EnrichedEvent] {
113-
// TODO(pre-aggregation): Remove the NotAPIPostProcessed condition to enable pre-aggregation
114-
if !enrichedEvent.InitialEvent.NotAPIPostProcessed() || enrichedEvent.Subscription == nil {
113+
if enrichedEvent.Subscription == nil {
115114
return utils.SuccessResult([]*models.EnrichedEvent{enrichedEvent})
116115
}
117116

events-processor/processors/event_processors/enrichment_service_test.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -85,53 +85,6 @@ func TestEnrichEvent(t *testing.T) {
8585
assert.Equal(t, "Error fetching billable metric", result.ErrorMessage())
8686
})
8787

88-
t.Run("When event source is post processed on API and the result is successful", func(t *testing.T) {
89-
sqlmock, delete := setupTestEnv(t)
90-
defer delete()
91-
92-
properties := map[string]any{
93-
"api_requests": "12.0",
94-
}
95-
96-
event := models.Event{
97-
OrganizationID: "1a901a90-1a90-1a90-1a90-1a901a901a90",
98-
ExternalSubscriptionID: "sub_id",
99-
Code: "api_calls",
100-
Timestamp: 1741007009,
101-
Source: models.HTTP_RUBY,
102-
Properties: properties,
103-
SourceMetadata: &models.SourceMetadata{
104-
ApiPostProcess: true,
105-
},
106-
}
107-
108-
bm := models.BillableMetric{
109-
ID: "bm123",
110-
OrganizationID: event.OrganizationID,
111-
Code: event.Code,
112-
AggregationType: models.AggregationTypeSum,
113-
FieldName: "api_requests",
114-
Expression: "",
115-
CreatedAt: time.Now(),
116-
UpdatedAt: time.Now(),
117-
}
118-
mockBmLookup(sqlmock, &bm)
119-
120-
sub := models.Subscription{ID: "sub123", PlanID: "plan123"}
121-
mockSubscriptionLookup(sqlmock, &sub)
122-
123-
result := processor.EnrichEvent(&event)
124-
125-
assert.True(t, result.Success())
126-
assert.Equal(t, 1, len(result.Value()))
127-
128-
eventResult := result.Value()[0]
129-
assert.Equal(t, "12.0", *eventResult.Value)
130-
assert.Equal(t, "sum", eventResult.AggregationType)
131-
assert.Equal(t, "sub123", eventResult.SubscriptionID)
132-
assert.Equal(t, "plan123", eventResult.PlanID)
133-
})
134-
13588
t.Run("When timestamp is invalid", func(t *testing.T) {
13689
sqlmock, delete := setupTestEnv(t)
13790
defer delete()
@@ -194,7 +147,7 @@ func TestEnrichEvent(t *testing.T) {
194147
assert.Equal(t, "Error evaluating custom expression", result.ErrorMessage())
195148
})
196149

197-
t.Run("When event source is not post process on API", func(t *testing.T) {
150+
t.Run("With a flat filter", func(t *testing.T) {
198151
sqlmock, delete := setupTestEnv(t)
199152
defer delete()
200153

@@ -235,7 +188,7 @@ func TestEnrichEvent(t *testing.T) {
235188
assert.Equal(t, "12", *eventResult.Value)
236189
})
237190

238-
t.Run("When event source is not post process on API with multiple flat filters", func(t *testing.T) {
191+
t.Run("With multiple flat filters", func(t *testing.T) {
239192
sqlmock, delete := setupTestEnv(t)
240193
defer delete()
241194

@@ -308,7 +261,7 @@ func TestEnrichEvent(t *testing.T) {
308261
assert.Equal(t, map[string]string{}, eventResult2.GroupedBy)
309262
})
310263

311-
t.Run("When event source is not post process on API with a flat filter with pricing group keys", func(t *testing.T) {
264+
t.Run("With a flat filter with pricing group keys", func(t *testing.T) {
312265
sqlmock, delete := setupTestEnv(t)
313266
defer delete()
314267

events-processor/processors/events.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ func processEvent(event *models.Event) utils.Result[*models.EnrichedEvent] {
9494

9595
go processor.ProducerService.ProduceEnrichedEvent(ctx, enrichedEvent)
9696

97-
// TODO(pre-aggregation): Uncomment to enable the feature
98-
// for _, ev := range enrichedEvents {
99-
// go processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
100-
// }
97+
for _, ev := range enrichedEvents {
98+
if ev.ChargeID != nil {
99+
go processor.ProducerService.ProduceEnrichedExpendedEvent(ctx, ev)
100+
}
101+
}
101102

102103
if enrichedEvent.Subscription != nil && event.NotAPIPostProcessed() {
103104
payInAdvance := false

events-processor/processors/events_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ func TestProcessEvent(t *testing.T) {
165165
sub := models.Subscription{ID: "sub123", PlanID: "plan123"}
166166
mockSubscriptionLookup(sqlmock, &sub)
167167

168+
mockFlatFiltersLookup(sqlmock, []*models.FlatFilter{
169+
{
170+
OrganizationID: event.OrganizationID,
171+
BillableMetricCode: event.Code,
172+
PlanID: "plan_id",
173+
ChargeID: "charge_idxx",
174+
ChargeUpdatedAt: time.Now(),
175+
PayInAdvance: true,
176+
},
177+
})
178+
168179
result := processEvent(&event)
169180

170181
assert.True(t, result.Success())
@@ -177,7 +188,7 @@ func TestProcessEvent(t *testing.T) {
177188
// TODO: Improve this by using channels in the producers methods
178189
time.Sleep(50 * time.Millisecond)
179190
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
180-
// TODO(pre-aggregation): assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
191+
assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
181192
})
182193

183194
t.Run("When event source is not post process on API when timestamp is invalid", func(t *testing.T) {
@@ -367,7 +378,7 @@ func TestProcessEvent(t *testing.T) {
367378
time.Sleep(50 * time.Millisecond)
368379
assert.Equal(t, 1, testProducers.inAdvanceProducer.ExecutionCount)
369380
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
370-
// TODO(pre-aggregation): assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
381+
assert.Equal(t, 1, testProducers.enrichedExpandedProducer.ExecutionCount)
371382

372383
assert.Equal(t, 1, flagger.ExecutionCount)
373384
})
@@ -441,7 +452,7 @@ func TestProcessEvent(t *testing.T) {
441452
// TODO: Improve this by using channels in the producers methods
442453
time.Sleep(50 * time.Millisecond)
443454
assert.Equal(t, 1, testProducers.enrichedProducer.ExecutionCount)
444-
// TODO(pre-aggregation): assert.Equal(t, 2, testProducers.enrichedExpandedProducer.ExecutionCount)
455+
assert.Equal(t, 2, testProducers.enrichedExpandedProducer.ExecutionCount)
445456
})
446457

447458
t.Run("When event source is not post processed on API and it matches no charges", func(t *testing.T) {

0 commit comments

Comments
 (0)