Skip to content

Commit ff7b771

Browse files
committed
Close the old connection to make sure the broker drops the producer on its side
1 parent 742f1b1 commit ff7b771

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

pulsar/producer_partition.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,15 @@ func (p *partitionProducer) reconnectToBroker() {
353353
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
354354
return
355355
}
356+
357+
if p.cnx != nil {
358+
p.cnx.UnregisterListener(p.producerID)
359+
state := p.cnx.getState()
360+
if state == connectionClosing || state == connectionClosed {
361+
p.cnx.close()
362+
}
363+
}
364+
356365
errMsg := err.Error()
357366
if strings.Contains(errMsg, errTopicNotFount) {
358367
// when topic is deleted, we should give up reconnection.

0 commit comments

Comments
 (0)