Skip to content

Commit ae4cce1

Browse files
committed
fix
1 parent 7893abe commit ae4cce1

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

internal/trace/queue_manager.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func newBatchQueueManager(o batchQueueManagerOptions) *BatchQueueManager {
5757
dropped: 0,
5858
batch: make([]interface{}, 0, o.maxExportBatchLength),
5959
batchMutex: sync.Mutex{},
60+
sizeMutex: sync.RWMutex{},
6061
timer: time.NewTimer(o.batchTimeout),
6162
exportFunc: o.exportFunc,
6263
stopWait: sync.WaitGroup{},
@@ -85,6 +86,7 @@ type BatchQueueManager struct {
8586
batch []interface{}
8687
batchByteSize int64
8788
batchMutex sync.Mutex
89+
sizeMutex sync.RWMutex
8890
timer *time.Timer
8991

9092
exportFunc func(ctx context.Context, s []interface{})
@@ -137,9 +139,12 @@ func (b *BatchQueueManager) isShouldExport() bool {
137139
if len(b.batch) >= b.o.maxExportBatchLength {
138140
return true
139141
}
142+
143+
b.sizeMutex.RLock()
140144
if b.batchByteSize >= int64(b.o.maxExportBatchByteSize) {
141145
return true
142146
}
147+
b.sizeMutex.RUnlock()
143148

144149
return false
145150
}
@@ -182,7 +187,9 @@ func (b *BatchQueueManager) doExport(ctx context.Context) {
182187
}
183188
// delete the batch
184189
b.batch = b.batch[:0]
190+
b.sizeMutex.Lock()
185191
b.batchByteSize = 0
192+
b.sizeMutex.Unlock()
186193
}
187194
}
188195

@@ -197,9 +204,9 @@ func (b *BatchQueueManager) Enqueue(ctx context.Context, sd interface{}, byteSiz
197204
var isFail bool
198205
select {
199206
case b.queue <- sd:
200-
b.batchMutex.Lock()
207+
b.sizeMutex.Lock()
201208
b.batchByteSize += byteSize
202-
b.batchMutex.Unlock()
209+
b.sizeMutex.Unlock()
203210
detailMsg = fmt.Sprintf("%s enqueue, queue length: %d", b.o.queueName, len(b.queue))
204211
default: // queue is full, not block, drop
205212
detailMsg = fmt.Sprintf("%s queue is full, dropped item", b.o.queueName)

internal/trace/span_processor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ const (
3838
)
3939

4040
type QueueConf struct {
41-
SpanQueueLength int
42-
SpanQueueMaxExportBatchLength int
41+
SpanQueueLength int
42+
SpanMaxExportBatchLength int
4343
}
4444

4545
var _ SpanProcessor = (*BatchSpanProcessor)(nil)
@@ -79,13 +79,13 @@ func NewBatchSpanProcessor(
7979
exporter = ex
8080
}
8181
var spanQueueLength = DefaultMaxQueueLength
82-
var spanQueueMaxExportBatchLength = DefaultMaxExportBatchLength
82+
var spanMaxExportBatchLength = DefaultMaxExportBatchLength
8383
if queueConf != nil {
8484
if queueConf.SpanQueueLength > 0 {
8585
spanQueueLength = queueConf.SpanQueueLength
8686
}
87-
if queueConf.SpanQueueMaxExportBatchLength > 0 { // todo: need max limit
88-
spanQueueMaxExportBatchLength = queueConf.SpanQueueMaxExportBatchLength
87+
if queueConf.SpanMaxExportBatchLength > 0 { // todo: need max limit
88+
spanMaxExportBatchLength = queueConf.SpanMaxExportBatchLength
8989
}
9090
}
9191

@@ -126,7 +126,7 @@ func NewBatchSpanProcessor(
126126
queueName: queueNameSpan,
127127
batchTimeout: time.Duration(DefaultScheduleDelay) * time.Millisecond,
128128
maxQueueLength: spanQueueLength,
129-
maxExportBatchLength: spanQueueMaxExportBatchLength,
129+
maxExportBatchLength: spanMaxExportBatchLength,
130130
maxExportBatchByteSize: DefaultMaxExportBatchByteSize,
131131
exportFunc: newExportSpansFunc(exporter, spanRetryQM, fileQM, finishEventProcessor),
132132
finishEventProcessor: finishEventProcessor,

0 commit comments

Comments
 (0)