Skip to content

Commit 04e17d0

Browse files
authored
Fix memory leaks and OOM issues in streaming processing (#777)
1 parent fab20d0 commit 04e17d0

File tree

9 files changed

+634
-67
lines changed

9 files changed

+634
-67
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Release Notes.
5757
- Fix topN parsing panic when the criteria is set.
5858
- Remove the indexed_only field in TagSpec.
5959
- Fix returning empty result when using IN operatior on the array type tags.
60+
- Fix memory leaks and OOM issues in streaming processing by implementing deduplication logic in priority queues and improving sliding window memory management.
6061
- Fix etcd prefix matching any key that starts with this prefix.
6162

6263
### Document

banyand/measure/write_liaison.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp
131131
// Send to all nodes for this shard
132132
for _, node := range nodes {
133133
message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
134-
_, publishErr := w.tire2Client.Publish(ctx, topic, message)
134+
future, publishErr := w.tire2Client.Publish(ctx, topic, message)
135135
if publishErr != nil {
136136
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node")
137+
continue
138+
}
139+
_, err := future.Get()
140+
if err != nil {
141+
w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish")
142+
continue
137143
}
138144
}
139145
}

banyand/queue/pub/pub.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,9 @@ type publishResult struct {
229229

230230
func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Message) (bus.Future, error) {
231231
var err error
232-
f := &future{}
232+
f := &future{
233+
log: p.log,
234+
}
233235
handleMessage := func(m bus.Message, err error) error {
234236
r, errSend := messageToRequest(topic, m)
235237
if errSend != nil {
@@ -362,6 +364,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e
362364
}
363365

364366
type future struct {
367+
log *logger.Logger
365368
clients []clusterv1.Service_SendClient
366369
cancelFn []func()
367370
topics []bus.Topic
@@ -372,10 +375,16 @@ func (l *future) Get() (bus.Message, error) {
372375
if len(l.clients) < 1 {
373376
return bus.Message{}, io.EOF
374377
}
378+
375379
c := l.clients[0]
376380
t := l.topics[0]
377381
n := l.nodes[0]
382+
378383
defer func() {
384+
if err := c.CloseSend(); err != nil {
385+
l.log.Error().Err(err).Msg("failed to close send stream")
386+
}
387+
379388
l.clients = l.clients[1:]
380389
l.topics = l.topics[1:]
381390
l.cancelFn[0]()

banyand/stream/write_liaison.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp
204204
// Send to all nodes for this shard
205205
for _, node := range nodes {
206206
message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
207-
_, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
207+
future, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
208208
if publishErr != nil {
209209
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node")
210+
continue
211+
}
212+
_, err := future.Get()
213+
if err != nil {
214+
w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish")
215+
continue
210216
}
211217
}
212218
}

pkg/flow/dedup_priority_queue.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,21 @@ type Element interface {
3131
SetIndex(int)
3232
}
3333

34+
// HashableElement represents an element that can be hashed and compared for equality.
35+
type HashableElement interface {
36+
Element
37+
// Hash returns a hash value for this element
38+
Hash() uint64
39+
// Equal compares this element with another for content equality
40+
Equal(HashableElement) bool
41+
}
42+
3443
// DedupPriorityQueue implements heap.Interface.
3544
// DedupPriorityQueue is not thread-safe.
3645
type DedupPriorityQueue struct {
3746
comparator utils.Comparator
3847
cache map[Element]struct{}
48+
hashCache map[uint64][]HashableElement // For content-based deduplication
3949
Items []Element
4050
allowDuplicates bool
4151
}
@@ -46,6 +56,7 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP
4656
comparator: comparator,
4757
Items: make([]Element, 0),
4858
cache: make(map[Element]struct{}),
59+
hashCache: make(map[uint64][]HashableElement),
4960
allowDuplicates: allowDuplicates,
5061
}
5162
}
@@ -60,6 +71,9 @@ func (pq *DedupPriorityQueue) Less(i, j int) bool {
6071

6172
// Swap exchanges indexes of the items.
6273
func (pq *DedupPriorityQueue) Swap(i, j int) {
74+
if i < 0 || i >= len(pq.Items) || j < 0 || j >= len(pq.Items) {
75+
panic("index out of range in DedupPriorityQueue.Swap")
76+
}
6377
pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i]
6478
pq.Items[i].SetIndex(i)
6579
pq.Items[j].SetIndex(j)
@@ -71,11 +85,29 @@ func (pq *DedupPriorityQueue) Push(x interface{}) {
7185
item := x.(Element)
7286
// if duplicates is not allowed
7387
if !pq.allowDuplicates {
74-
// use mutex to protect cache and items
75-
// check existence
88+
// Check for reference-based duplicates first
7689
if _, ok := pq.cache[item]; ok {
7790
return
7891
}
92+
93+
// Check for content-based duplicates if the item implements HashableElement
94+
if hashableItem, ok := item.(HashableElement); ok {
95+
hash := hashableItem.Hash()
96+
if existingItems, exists := pq.hashCache[hash]; exists {
97+
// Check if any existing item has the same content
98+
for _, existing := range existingItems {
99+
if hashableItem.Equal(existing) {
100+
return // Duplicate found, don't add
101+
}
102+
}
103+
// No duplicate found, add to hash cache
104+
pq.hashCache[hash] = append(pq.hashCache[hash], hashableItem)
105+
} else {
106+
// First item with this hash
107+
pq.hashCache[hash] = []HashableElement{hashableItem}
108+
}
109+
}
110+
79111
pq.cache[item] = struct{}{}
80112
}
81113
n := len(pq.Items)
@@ -90,6 +122,24 @@ func (pq *DedupPriorityQueue) Pop() interface{} {
90122
item := pq.Items[n-1]
91123
item.SetIndex(-1) // for safety
92124
delete(pq.cache, item)
125+
126+
// Clean up hash cache if item implements HashableElement
127+
if hashableItem, ok := item.(HashableElement); ok {
128+
hash := hashableItem.Hash()
129+
if existingItems, exists := pq.hashCache[hash]; exists {
130+
// Remove the specific item from the hash cache
131+
for i, existing := range existingItems {
132+
if hashableItem.Equal(existing) {
133+
pq.hashCache[hash] = append(existingItems[:i], existingItems[i+1:]...)
134+
if len(pq.hashCache[hash]) == 0 {
135+
delete(pq.hashCache, hash)
136+
}
137+
break
138+
}
139+
}
140+
}
141+
}
142+
93143
pq.Items = pq.Items[0 : n-1]
94144
return item
95145
}

0 commit comments

Comments
 (0)