@@ -304,6 +304,7 @@ type pendingItem struct {
304
304
batchData []byte
305
305
sequenceID uint64
306
306
sendRequests []interface {}
307
+ completed bool
307
308
}
308
309
309
310
func (p * partitionProducer ) internalFlushCurrentBatch () {
@@ -329,6 +330,19 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
329
330
return
330
331
}
331
332
333
+ // lock the pending request while adding requests
334
+ // since the ReceivedSendReceipt func iterates over this list
335
+ pi .Lock ()
336
+ defer pi .Unlock ()
337
+
338
+ if pi .completed {
339
+ // The last item in the queue has been completed while we were
340
+ // looking at it. It's safe at this point to assume that every
341
+ // message enqueued before Flush() was called are now persisted
342
+ fr .waitGroup .Done ()
343
+ return
344
+ }
345
+
332
346
sendReq := & sendRequest {
333
347
msg : nil ,
334
348
callback : func (id MessageID , message * ProducerMessage , e error ) {
@@ -337,11 +351,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
337
351
},
338
352
}
339
353
340
- // lock the pending request while adding requests
341
- // since the ReceivedSendReceipt func iterates over this list
342
- pi .Lock ()
343
354
pi .sendRequests = append (pi .sendRequests , sendReq )
344
- pi .Unlock ()
345
355
}
346
356
347
357
func (p * partitionProducer ) Send (ctx context.Context , msg * ProducerMessage ) (MessageID , error ) {
@@ -422,6 +432,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
422
432
sr .callback (msgID , sr .msg , nil )
423
433
}
424
434
}
435
+
436
+ // Mark this pending item as done
437
+ pi .completed = true
425
438
}
426
439
427
440
func (p * partitionProducer ) internalClose (req * closeProducer ) {
0 commit comments