From 26a32c2c3ca1aa94fb16fb7a987cba7f7308b00f Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sun, 21 Sep 2025 08:27:28 +0800 Subject: [PATCH] Fix memory leaks and OOM issues in streaming processing Fix the issue by implementing deduplication logic in priority queues and improving sliding window memory management. Signed-off-by: Gao Hongtao --- CHANGES.md | 1 + banyand/measure/write_liaison.go | 8 +- banyand/queue/pub/pub.go | 11 +- banyand/stream/write_liaison.go | 8 +- pkg/flow/dedup_priority_queue.go | 54 +++- pkg/flow/dedup_priority_queue_test.go | 370 ++++++++++++++++++++++ pkg/flow/streaming/sliding_window.go | 110 +++---- pkg/flow/streaming/sliding_window_test.go | 137 ++++++++ pkg/flow/types.go | 2 +- 9 files changed, 634 insertions(+), 67 deletions(-) create mode 100644 pkg/flow/dedup_priority_queue_test.go diff --git a/CHANGES.md b/CHANGES.md index d788fc11b..c48ab2c0c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -57,6 +57,7 @@ Release Notes. - Fix topN parsing panic when the criteria is set. - Remove the indexed_only field in TagSpec. - Fix returning empty result when using IN operatior on the array type tags. +- Fix memory leaks and OOM issues in streaming processing by implementing deduplication logic in priority queues and improving sliding window memory management. ### Document diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go index 4b360c78a..7b287ff51 100644 --- a/banyand/measure/write_liaison.go +++ b/banyand/measure/write_liaison.go @@ -131,9 +131,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp // Send to all nodes for this shard for _, node := range nodes { message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData) - _, publishErr := w.tire2Client.Publish(ctx, topic, message) + future, publishErr := w.tire2Client.Publish(ctx, topic, message) if publishErr != nil { w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node") + continue + } + _, err := future.Get() + if err != nil { + w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish") + continue } } } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 74fa2432f..e8a397ed0 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -229,7 +229,9 @@ type publishResult struct { func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { var err error - f := &future{} + f := &future{ + log: p.log, + } handleMessage := func(m bus.Message, err error) error { r, errSend := messageToRequest(topic, m) if errSend != nil { @@ -362,6 +364,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e } type future struct { + log *logger.Logger clients []clusterv1.Service_SendClient cancelFn []func() topics []bus.Topic @@ -372,10 +375,16 @@ func (l *future) Get() (bus.Message, error) { if len(l.clients) < 1 { return bus.Message{}, io.EOF } + c := l.clients[0] t := l.topics[0] n := l.nodes[0] + defer func() { + if err := c.CloseSend(); err != nil { + l.log.Error().Err(err).Msg("failed to close send stream") + } + l.clients = l.clients[1:] l.topics = l.topics[1:] l.cancelFn[0]() diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go index d8478f1b1..c78d1a375 100644 --- a/banyand/stream/write_liaison.go +++ b/banyand/stream/write_liaison.go @@ -204,9 +204,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp // Send to all nodes for this shard for _, node := range nodes { message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData) - _, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message) + future, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message) if publishErr != nil { w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node") + continue + } + _, err := future.Get() + if err != nil { + w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish") + continue } } } diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go index 527ab6f51..becbd55d8 100644 --- a/pkg/flow/dedup_priority_queue.go +++ b/pkg/flow/dedup_priority_queue.go @@ -31,11 +31,21 @@ type Element interface { SetIndex(int) } +// HashableElement represents an element that can be hashed and compared for equality. +type HashableElement interface { + Element + // Hash returns a hash value for this element + Hash() uint64 + // Equal compares this element with another for content equality + Equal(HashableElement) bool +} + // DedupPriorityQueue implements heap.Interface. // DedupPriorityQueue is not thread-safe. type DedupPriorityQueue struct { comparator utils.Comparator cache map[Element]struct{} + hashCache map[uint64][]HashableElement // For content-based deduplication Items []Element allowDuplicates bool } @@ -46,6 +56,7 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP comparator: comparator, Items: make([]Element, 0), cache: make(map[Element]struct{}), + hashCache: make(map[uint64][]HashableElement), allowDuplicates: allowDuplicates, } } @@ -60,6 +71,9 @@ func (pq *DedupPriorityQueue) Less(i, j int) bool { // Swap exchanges indexes of the items. func (pq *DedupPriorityQueue) Swap(i, j int) { + if i < 0 || i >= len(pq.Items) || j < 0 || j >= len(pq.Items) { + panic("index out of range in DedupPriorityQueue.Swap") + } pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i] pq.Items[i].SetIndex(i) pq.Items[j].SetIndex(j) @@ -71,11 +85,29 @@ func (pq *DedupPriorityQueue) Push(x interface{}) { item := x.(Element) // if duplicates is not allowed if !pq.allowDuplicates { - // use mutex to protect cache and items - // check existence + // Check for reference-based duplicates first if _, ok := pq.cache[item]; ok { return } + + // Check for content-based duplicates if the item implements HashableElement + if hashableItem, ok := item.(HashableElement); ok { + hash := hashableItem.Hash() + if existingItems, exists := pq.hashCache[hash]; exists { + // Check if any existing item has the same content + for _, existing := range existingItems { + if hashableItem.Equal(existing) { + return // Duplicate found, don't add + } + } + // No duplicate found, add to hash cache + pq.hashCache[hash] = append(pq.hashCache[hash], hashableItem) + } else { + // First item with this hash + pq.hashCache[hash] = []HashableElement{hashableItem} + } + } + pq.cache[item] = struct{}{} } n := len(pq.Items) @@ -90,6 +122,24 @@ func (pq *DedupPriorityQueue) Pop() interface{} { item := pq.Items[n-1] item.SetIndex(-1) // for safety delete(pq.cache, item) + + // Clean up hash cache if item implements HashableElement + if hashableItem, ok := item.(HashableElement); ok { + hash := hashableItem.Hash() + if existingItems, exists := pq.hashCache[hash]; exists { + // Remove the specific item from the hash cache + for i, existing := range existingItems { + if hashableItem.Equal(existing) { + pq.hashCache[hash] = append(existingItems[:i], existingItems[i+1:]...) + if len(pq.hashCache[hash]) == 0 { + delete(pq.hashCache, hash) + } + break + } + } + } + } + pq.Items = pq.Items[0 : n-1] return item } diff --git a/pkg/flow/dedup_priority_queue_test.go b/pkg/flow/dedup_priority_queue_test.go new file mode 100644 index 000000000..d79b01870 --- /dev/null +++ b/pkg/flow/dedup_priority_queue_test.go @@ -0,0 +1,370 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flow + +import ( + "container/heap" + "fmt" + "testing" +) + +type testHashableElement struct { + id string + val int + index int +} + +func (t *testHashableElement) GetIndex() int { + return t.index +} + +func (t *testHashableElement) SetIndex(idx int) { + t.index = idx +} + +func (t *testHashableElement) Hash() uint64 { + // Simple hash based on id + hash := uint64(0) + for _, b := range []byte(t.id) { + hash = hash*31 + uint64(b) + } + return hash +} + +func (t *testHashableElement) Equal(other HashableElement) bool { + if otherElem, ok := other.(*testHashableElement); ok { + return t.id == otherElem.id + } + return false +} + +type testElement struct { + val int + index int +} + +func (t *testElement) GetIndex() int { + return t.index +} + +func (t *testElement) SetIndex(idx int) { + t.index = idx +} + +func TestHashableElementHash(t *testing.T) { + elem1 := &testHashableElement{id: "test"} + elem2 := &testHashableElement{id: "test"} + elem3 := &testHashableElement{id: "different"} + + if elem1.Hash() != elem2.Hash() { + t.Errorf("Expected same hash for elements with same id, got %d and %d", elem1.Hash(), elem2.Hash()) + } + + if elem1.Hash() == elem3.Hash() { + t.Errorf("Expected different hash for elements with different id, got same hash %d", elem1.Hash()) + } +} + +func TestHashableElementEqual(t *testing.T) { + elem1 := &testHashableElement{id: "test", val: 10} + elem2 := &testHashableElement{id: "test", val: 20} // Same id, different val + elem3 := &testHashableElement{id: "different", val: 10} // Different id, same val + + // Same id should be equal regardless of other fields + if !elem1.Equal(elem2) { + t.Error("Expected elements with same id to be equal") + } + + // Different id should not be equal + if elem1.Equal(elem3) { + t.Error("Expected elements with different id to not be equal") + } +} + +func TestDedupPriorityQueue_ContentBasedDeduplication(t *testing.T) { + // Create a priority queue with deduplication enabled + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Create two elements with the same id (should be deduplicated) + elem1 := &testHashableElement{id: "test", val: 10} + elem2 := &testHashableElement{id: "test", val: 10} + + // Push both elements + heap.Push(pq, elem1) + heap.Push(pq, elem2) + + // Should only have one element in the heap + if pq.Len() != 1 { + t.Errorf("Expected 1 element in heap after deduplication, got %d", pq.Len()) + } + + // Create elements with different ids (should not be deduplicated) + elem3 := &testHashableElement{id: "different", val: 20} + heap.Push(pq, elem3) + + // Should now have two elements + if pq.Len() != 2 { + t.Errorf("Expected 2 elements in heap after adding different element, got %d", pq.Len()) + } +} + +func TestDedupPriorityQueue_AllowDuplicates(t *testing.T) { + // Create a priority queue with duplicates allowed + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, true) + + // Create two elements with the same id + elem1 := &testHashableElement{id: "test", val: 10} + elem2 := &testHashableElement{id: "test", val: 10} + + // Push both elements + heap.Push(pq, elem1) + heap.Push(pq, elem2) + + // Should have two elements in the heap (duplicates allowed) + if pq.Len() != 2 { + t.Errorf("Expected 2 elements in heap when duplicates are allowed, got %d", pq.Len()) + } + + // Even pushing the same reference should be allowed + heap.Push(pq, elem1) + if pq.Len() != 3 { + t.Errorf("Expected 3 elements in heap after pushing same reference when duplicates allowed, got %d", pq.Len()) + } +} + +func TestDedupPriorityQueue_HashCollisionHandling(t *testing.T) { + // Create a priority queue with deduplication enabled + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Create elements that might have hash collisions + elem1 := &testHashableElement{id: "ab", val: 10} // Hash might collide with "ba" + elem2 := &testHashableElement{id: "ba", val: 10} // Different id, might have same hash + elem3 := &testHashableElement{id: "ab", val: 20} // Same id as elem1, should be deduplicated + + heap.Push(pq, elem1) + heap.Push(pq, elem2) + heap.Push(pq, elem3) + + // Should have 2 elements: elem1 (or elem3, they're equivalent) and elem2 + if pq.Len() != 2 { + t.Errorf("Expected 2 elements after hash collision test, got %d", pq.Len()) + } +} + +func TestDedupPriorityQueue_PopCleanup(t *testing.T) { + // Create a priority queue with deduplication enabled + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Add elements + elem1 := &testHashableElement{id: "test1", val: 10} + elem2 := &testHashableElement{id: "test2", val: 20} + elem3 := &testHashableElement{id: "test1", val: 30} // Same id as elem1 + + heap.Push(pq, elem1) + heap.Push(pq, elem2) + heap.Push(pq, elem3) // Should be deduplicated + + // Should have 2 elements (elem1 and elem2) + if pq.Len() != 2 { + t.Errorf("Expected 2 elements after deduplication, got %d", pq.Len()) + } + + // Pop an element + popped := heap.Pop(pq).(*testHashableElement) + + // Should have 1 element left + if pq.Len() != 1 { + t.Errorf("Expected 1 element after pop, got %d", pq.Len()) + } + + // Try to add the same element again - should be allowed since it was popped + heap.Push(pq, popped) + + // Should now have 2 elements again + if pq.Len() != 2 { + t.Errorf("Expected 2 elements after re-adding popped element, got %d", pq.Len()) + } +} + +func TestDedupPriorityQueue_MixedElementTypes(t *testing.T) { + // Create a priority queue that can handle both Element and HashableElement + pq := NewPriorityQueue(func(a, b interface{}) int { + switch va := a.(type) { + case *testHashableElement: + switch vb := b.(type) { + case *testHashableElement: + return va.val - vb.val + case *testElement: + return va.val - vb.val + } + case *testElement: + switch vb := b.(type) { + case *testHashableElement: + return va.val - vb.val + case *testElement: + return va.val - vb.val + } + } + return 0 + }, false) + + // Add mixed element types + hashElem1 := &testHashableElement{id: "test", val: 10} + hashElem2 := &testHashableElement{id: "test", val: 20} // Same id, should be deduplicated + regularElem := &testElement{val: 15} + + heap.Push(pq, hashElem1) + heap.Push(pq, regularElem) + heap.Push(pq, hashElem2) // Should be deduplicated with hashElem1 + + // Should have 2 elements: hashElem1 and regularElem + if pq.Len() != 2 { + t.Errorf("Expected 2 elements with mixed types, got %d", pq.Len()) + } +} + +func TestDedupPriorityQueue_EmptyHeapOperations(t *testing.T) { + // Create an empty priority queue + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Test operations on empty heap + if pq.Len() != 0 { + t.Errorf("Expected empty heap to have length 0, got %d", pq.Len()) + } + + if pq.Peek() != nil { + t.Error("Expected Peek() on empty heap to return nil") + } + + // Test that Pop panics on empty heap + defer func() { + if r := recover(); r == nil { + t.Error("Expected Pop() on empty heap to panic") + } + }() + heap.Pop(pq) +} + +func TestDedupPriorityQueue_PriorityOrdering(t *testing.T) { + // Test that deduplication doesn't affect priority ordering + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Add elements in random order + elem1 := &testHashableElement{id: "high", val: 30} + elem2 := &testHashableElement{id: "low", val: 10} + elem3 := &testHashableElement{id: "high", val: 40} // Same id as elem1, should be deduplicated + elem4 := &testHashableElement{id: "medium", val: 20} + + heap.Push(pq, elem1) + heap.Push(pq, elem2) + heap.Push(pq, elem3) // Should be deduplicated + heap.Push(pq, elem4) + + // Should have 3 elements (elem3 deduplicated) + if pq.Len() != 3 { + t.Errorf("Expected 3 elements after deduplication, got %d", pq.Len()) + } + + // Pop elements and verify they come out in priority order + expectedOrder := []int{10, 20, 30} // elem2, elem4, elem1 + for i, expectedVal := range expectedOrder { + if pq.Len() == 0 { + t.Errorf("Expected more elements, but heap is empty at position %d", i) + break + } + popped := heap.Pop(pq).(*testHashableElement) + if popped.val != expectedVal { + t.Errorf("Expected value %d at position %d, got %d", expectedVal, i, popped.val) + } + } +} + +func TestDedupPriorityQueue_LargeScaleDeduplication(t *testing.T) { + // Test deduplication with many elements + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Add 100 elements, but only 10 unique ids + for i := 0; i < 100; i++ { + id := "id" + string(rune(i%10+'0')) // id0, id1, ..., id9, id0, id1, ... + elem := &testHashableElement{id: id, val: i} + heap.Push(pq, elem) + } + + // Should have only 10 unique elements + if pq.Len() != 10 { + t.Errorf("Expected 10 unique elements, got %d", pq.Len()) + } + + // Verify all remaining elements have unique ids + seen := make(map[string]bool) + for pq.Len() > 0 { + elem := heap.Pop(pq).(*testHashableElement) + if seen[elem.id] { + t.Errorf("Found duplicate id %s in final heap", elem.id) + } + seen[elem.id] = true + } + + // Should have seen exactly 10 unique ids + if len(seen) != 10 { + t.Errorf("Expected 10 unique ids, got %d", len(seen)) + } +} + +func BenchmarkDedupPriorityQueue_Push(b *testing.B) { + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + elem := &testHashableElement{id: "benchmark", val: i} + heap.Push(pq, elem) + } +} + +func BenchmarkDedupPriorityQueue_Pop(b *testing.B) { + pq := NewPriorityQueue(func(a, b interface{}) int { + return a.(*testHashableElement).val - b.(*testHashableElement).val + }, false) + + // Pre-populate with unique elements + for i := 0; i < b.N; i++ { + elem := &testHashableElement{id: fmt.Sprintf("unique_%d", i), val: i} + heap.Push(pq, elem) + } + + b.ResetTimer() + for i := 0; i < b.N && pq.Len() > 0; i++ { + heap.Pop(pq) + } +} diff --git a/pkg/flow/streaming/sliding_window.go b/pkg/flow/streaming/sliding_window.go index ed012084c..b049dd5da 100644 --- a/pkg/flow/streaming/sliding_window.go +++ b/pkg/flow/streaming/sliding_window.go @@ -142,7 +142,7 @@ func (s *tumblingTimeWindows) flushDueWindows() { defer s.timerMu.Unlock() for { if lookAhead, ok := s.timerHeap.Peek().(*internalTimer); ok { - if lookAhead.triggerTimeMillis <= s.currentWatermark { + if lookAhead.w.MaxTimestamp() <= s.currentWatermark { oldestTimer := heap.Pop(s.timerHeap).(*internalTimer) s.flushWindow(oldestTimer.w) continue @@ -162,37 +162,32 @@ func (s *tumblingTimeWindows) receive() { defer s.Done() for elem := range s.in { - assignedWindows, err := s.AssignWindows(elem.TimestampMillis()) + assignedWindow, err := s.AssignWindows(elem.TimestampMillis()) if err != nil { s.errorHandler(err) continue } - ctx := triggerContext{ - delegation: s, + // drop if the window is late + if s.isWindowLate(assignedWindow) { + continue } - for _, w := range assignedWindows { - // drop if the window is late - if s.isWindowLate(w) { - continue - } - tw := w.(timeWindow) - ctx.window = tw - // add elem to the bucket - if oldAggr, ok := s.snapshots.Get(tw); ok { - oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem}) - } else { - newAggr := s.aggregationFactory() - newAggr.Add([]flow.StreamRecord{elem}) - s.snapshots.Add(tw, newAggr) - if e := s.l.Debug(); e.Enabled() { - e.Stringer("window", tw).Msg("create new window") - } + tw := assignedWindow.(timeWindow) + // add elem to the bucket + if oldAggr, ok := s.snapshots.Get(tw); ok { + oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem}) + } else { + newAggr := s.aggregationFactory() + newAggr.Add([]flow.StreamRecord{elem}) + s.snapshots.Add(tw, newAggr) + if e := s.l.Debug(); e.Enabled() { + e.Stringer("window", tw).Msg("create new window") } + } - result := ctx.OnElement(elem) - if result == fire { - s.flushWindow(tw) - } + result := s.eventTimeTriggerOnElement(tw) + + if result == fire { + s.flushWindow(tw) } // even if the incoming elements do not follow strict order, @@ -260,7 +255,7 @@ func NewTumblingTimeWindows(size time.Duration, maxFlushInterval time.Duration) return &tumblingTimeWindows{ windowSize: ws, timerHeap: flow.NewPriorityQueue(func(a, b interface{}) int { - return int(a.(*internalTimer).triggerTimeMillis - b.(*internalTimer).triggerTimeMillis) + return int(a.(*internalTimer).w.MaxTimestamp() - b.(*internalTimer).w.MaxTimestamp()) }, false), in: make(chan flow.StreamRecord), out: make(chan flow.StreamRecord), @@ -285,14 +280,12 @@ func (t timeWindow) String() string { } // AssignWindows assigns windows according to the given timestamp. -func (s *tumblingTimeWindows) AssignWindows(timestamp int64) ([]flow.Window, error) { +func (s *tumblingTimeWindows) AssignWindows(timestamp int64) (flow.Window, error) { if timestamp > math.MinInt64 { start := getWindowStart(timestamp, s.windowSize) - return []flow.Window{ - timeWindow{ - start: start, - end: start + s.windowSize, - }, + return timeWindow{ + start: start, + end: start + s.windowSize, }, nil } return nil, errors.New("invalid timestamp from the element") @@ -305,43 +298,27 @@ func getWindowStart(timestamp, windowSize int64) int64 { } // eventTimeTriggerOnElement processes element(s) with EventTimeTrigger. -func eventTimeTriggerOnElement(window timeWindow, ctx *triggerContext) triggerResult { - if window.MaxTimestamp() <= ctx.GetCurrentWatermark() { +func (s *tumblingTimeWindows) eventTimeTriggerOnElement(window timeWindow) triggerResult { + if window.MaxTimestamp() <= s.currentWatermark { // if watermark is already past the window fire immediately return fire } - ctx.RegisterEventTimeTimer(window.MaxTimestamp()) - return cont -} - -type triggerContext struct { - delegation *tumblingTimeWindows - window timeWindow -} - -func (ctx *triggerContext) GetCurrentWatermark() int64 { - return ctx.delegation.currentWatermark -} - -func (ctx *triggerContext) RegisterEventTimeTimer(triggerTime int64) { - ctx.delegation.timerMu.Lock() - defer ctx.delegation.timerMu.Unlock() - heap.Push(ctx.delegation.timerHeap, &internalTimer{ - triggerTimeMillis: triggerTime, - w: ctx.window, + s.timerMu.Lock() + defer s.timerMu.Unlock() + heap.Push(s.timerHeap, &internalTimer{ + w: window, }) + return cont } -func (ctx *triggerContext) OnElement(_ flow.StreamRecord) triggerResult { - return eventTimeTriggerOnElement(ctx.window, ctx) -} - -var _ flow.Element = (*internalTimer)(nil) +var ( + _ flow.Element = (*internalTimer)(nil) + _ flow.HashableElement = (*internalTimer)(nil) +) type internalTimer struct { - w timeWindow - triggerTimeMillis int64 - index int + w timeWindow + index int } func (t *internalTimer) GetIndex() int { @@ -351,3 +328,14 @@ func (t *internalTimer) GetIndex() int { func (t *internalTimer) SetIndex(idx int) { t.index = idx } + +func (t *internalTimer) Equal(other flow.HashableElement) bool { + if otherTimer, ok := other.(*internalTimer); ok { + return t.w.start == otherTimer.w.start && t.w.end == otherTimer.w.end + } + return false +} + +func (t *internalTimer) Hash() uint64 { + return uint64(t.w.start)<<32 | uint64(t.w.end) +} diff --git a/pkg/flow/streaming/sliding_window_test.go b/pkg/flow/streaming/sliding_window_test.go index cee7973d5..28077bc82 100644 --- a/pkg/flow/streaming/sliding_window_test.go +++ b/pkg/flow/streaming/sliding_window_test.go @@ -18,6 +18,7 @@ package streaming import ( + "container/heap" "context" "time" @@ -136,4 +137,140 @@ var _ = g.Describe("Sliding Window", func() { }).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed()) }) }) + + g.Describe("Timer Heap Deduplication", func() { + var timerHeap *flow.DedupPriorityQueue + + g.BeforeEach(func() { + timerHeap = flow.NewPriorityQueue(func(a, b interface{}) int { + return int(a.(*internalTimer).w.MaxTimestamp() - b.(*internalTimer).w.MaxTimestamp()) + }, false) + }) + + g.It("Should deduplicate same reference internalTimer objects", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, + } + + // Push the same reference twice + heap.Push(timerHeap, timer1) + heap.Push(timerHeap, timer1) + + // Should only have one item due to reference-based deduplication + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1)) + }) + + g.It("Should deduplicate different internalTimer objects with same window content", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, + } + timer2 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Same window content + } + + // Push different objects with same content + heap.Push(timerHeap, timer1) + heap.Push(timerHeap, timer2) + + // Should only have one item due to content-based deduplication + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1)) + }) + + g.It("Should not deduplicate internalTimer objects with different windows", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, + } + timer2 := &internalTimer{ + w: timeWindow{start: 2000, end: 3000}, // Different window + } + + // Push different objects with different content + heap.Push(timerHeap, timer1) + heap.Push(timerHeap, timer2) + + // Should have two items as they have different content + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2)) + }) + + g.It("Should maintain proper ordering after deduplication", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 3000, end: 4000}, // Later timestamp + } + timer2 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Earlier timestamp + } + timer3 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Duplicate of timer2 + } + + // Push in order: later, earlier, duplicate + heap.Push(timerHeap, timer1) + heap.Push(timerHeap, timer2) + heap.Push(timerHeap, timer3) // Should be deduplicated + + // Should only have 2 items + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2)) + + // Peek should return the earliest timer (timer2) + earliest := timerHeap.Peek().(*internalTimer) + gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000))) + gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000))) + }) + + g.It("Should verify Hash and Equal methods work correctly", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, + } + timer2 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Same content + } + timer3 := &internalTimer{ + w: timeWindow{start: 1000, end: 3000}, // Different end + } + + // Test Hash method + gomega.Expect(timer1.Hash()).Should(gomega.Equal(timer2.Hash())) + gomega.Expect(timer1.Hash()).ShouldNot(gomega.Equal(timer3.Hash())) + + // Test Equal method + gomega.Expect(timer1.Equal(timer2)).Should(gomega.BeTrue()) + gomega.Expect(timer1.Equal(timer3)).Should(gomega.BeFalse()) + }) + + g.It("Should work with heap operations", func() { + timer1 := &internalTimer{ + w: timeWindow{start: 3000, end: 4000}, // Later + } + timer2 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Earlier + } + timer3 := &internalTimer{ + w: timeWindow{start: 1000, end: 2000}, // Duplicate of timer2 + } + + // Initialize heap + heap.Init(timerHeap) + + // Push timers + heap.Push(timerHeap, timer1) + heap.Push(timerHeap, timer2) + heap.Push(timerHeap, timer3) // Should be deduplicated + + // Should only have 2 items + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2)) + + // Pop should return earliest first + earliest := heap.Pop(timerHeap).(*internalTimer) + gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000))) + gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000))) + + // Next should be the later timer + later := heap.Pop(timerHeap).(*internalTimer) + gomega.Expect(later.w.start).Should(gomega.Equal(int64(3000))) + gomega.Expect(later.w.end).Should(gomega.Equal(int64(4000))) + + // Heap should be empty now + gomega.Expect(timerHeap.Len()).Should(gomega.Equal(0)) + }) + }) }) diff --git a/pkg/flow/types.go b/pkg/flow/types.go index 6e8576a48..20bf4c503 100644 --- a/pkg/flow/types.go +++ b/pkg/flow/types.go @@ -73,7 +73,7 @@ type Window interface { type WindowAssigner interface { // AssignWindows assigns a slice of Window according to the given timestamp, e.g. eventTime. // The unit of the timestamp here is MilliSecond. - AssignWindows(timestamp int64) ([]Window, error) + AssignWindows(timestamp int64) (Window, error) } // AggregationOp defines the stateful operation for aggregation.