Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Release Notes.
- Fix topN parsing panic when the criteria is set.
- Remove the indexed_only field in TagSpec.
- Fix returning empty result when using IN operatior on the array type tags.
- Fix memory leaks and OOM issues in streaming processing by implementing deduplication logic in priority queues and improving sliding window memory management.
- Fix etcd prefix matching any key that starts with this prefix.

### Document
Expand Down
8 changes: 7 additions & 1 deletion banyand/measure/write_liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
_, publishErr := w.tire2Client.Publish(ctx, topic, message)
future, publishErr := w.tire2Client.Publish(ctx, topic, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node")
continue
}
_, err := future.Get()
if err != nil {
w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish")
continue
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion banyand/queue/pub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ type publishResult struct {

func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Message) (bus.Future, error) {
var err error
f := &future{}
f := &future{
log: p.log,
}
handleMessage := func(m bus.Message, err error) error {
r, errSend := messageToRequest(topic, m)
if errSend != nil {
Expand Down Expand Up @@ -362,6 +364,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e
}

type future struct {
log *logger.Logger
clients []clusterv1.Service_SendClient
cancelFn []func()
topics []bus.Topic
Expand All @@ -372,10 +375,16 @@ func (l *future) Get() (bus.Message, error) {
if len(l.clients) < 1 {
return bus.Message{}, io.EOF
}

c := l.clients[0]
t := l.topics[0]
n := l.nodes[0]

defer func() {
if err := c.CloseSend(); err != nil {
l.log.Error().Err(err).Msg("failed to close send stream")
}

l.clients = l.clients[1:]
l.topics = l.topics[1:]
l.cancelFn[0]()
Expand Down
8 changes: 7 additions & 1 deletion banyand/stream/write_liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
_, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
future, publishErr := w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to publish series index to node")
continue
}
_, err := future.Get()
if err != nil {
w.l.Error().Err(err).Str("node", node).Uint32("shardID", uint32(es.shardID)).Msg("failed to get response from publish")
continue
}
}
}
Expand Down
54 changes: 52 additions & 2 deletions pkg/flow/dedup_priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,21 @@ type Element interface {
SetIndex(int)
}

// HashableElement represents an element that can be hashed and compared for equality.
type HashableElement interface {
Element
// Hash returns a hash value for this element
Hash() uint64
// Equal compares this element with another for content equality
Equal(HashableElement) bool
}

// DedupPriorityQueue implements heap.Interface.
// DedupPriorityQueue is not thread-safe.
type DedupPriorityQueue struct {
comparator utils.Comparator
cache map[Element]struct{}
hashCache map[uint64][]HashableElement // For content-based deduplication
Items []Element
allowDuplicates bool
}
Expand All @@ -46,6 +56,7 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP
comparator: comparator,
Items: make([]Element, 0),
cache: make(map[Element]struct{}),
hashCache: make(map[uint64][]HashableElement),
allowDuplicates: allowDuplicates,
}
}
Expand All @@ -60,6 +71,9 @@ func (pq *DedupPriorityQueue) Less(i, j int) bool {

// Swap exchanges indexes of the items.
func (pq *DedupPriorityQueue) Swap(i, j int) {
if i < 0 || i >= len(pq.Items) || j < 0 || j >= len(pq.Items) {
panic("index out of range in DedupPriorityQueue.Swap")
}
pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i]
pq.Items[i].SetIndex(i)
pq.Items[j].SetIndex(j)
Expand All @@ -71,11 +85,29 @@ func (pq *DedupPriorityQueue) Push(x interface{}) {
item := x.(Element)
// if duplicates is not allowed
if !pq.allowDuplicates {
// use mutex to protect cache and items
// check existence
// Check for reference-based duplicates first
if _, ok := pq.cache[item]; ok {
return
}

// Check for content-based duplicates if the item implements HashableElement
if hashableItem, ok := item.(HashableElement); ok {
hash := hashableItem.Hash()
if existingItems, exists := pq.hashCache[hash]; exists {
// Check if any existing item has the same content
for _, existing := range existingItems {
if hashableItem.Equal(existing) {
return // Duplicate found, don't add
}
}
// No duplicate found, add to hash cache
pq.hashCache[hash] = append(pq.hashCache[hash], hashableItem)
} else {
// First item with this hash
pq.hashCache[hash] = []HashableElement{hashableItem}
}
}

pq.cache[item] = struct{}{}
}
n := len(pq.Items)
Expand All @@ -90,6 +122,24 @@ func (pq *DedupPriorityQueue) Pop() interface{} {
item := pq.Items[n-1]
item.SetIndex(-1) // for safety
delete(pq.cache, item)

// Clean up hash cache if item implements HashableElement
if hashableItem, ok := item.(HashableElement); ok {
hash := hashableItem.Hash()
if existingItems, exists := pq.hashCache[hash]; exists {
// Remove the specific item from the hash cache
for i, existing := range existingItems {
if hashableItem.Equal(existing) {
pq.hashCache[hash] = append(existingItems[:i], existingItems[i+1:]...)
if len(pq.hashCache[hash]) == 0 {
delete(pq.hashCache, hash)
}
break
}
}
}
}

pq.Items = pq.Items[0 : n-1]
return item
}
Expand Down
Loading
Loading