Skip to content

Commit a575681

Browse files
authored
Auto update the client to handle changes in number of partitions (apache#221)
* Auto update the client to handle changes in number of partitions * Fixed linter stuff * Fixed locking of producer/consumer list when updating partitions * Keep the lock during the partition update operation * Removed empty line * Fixed locking in producer.send()
1 parent 6e5c7d3 commit a575681

File tree

6 files changed

+269
-52
lines changed

6 files changed

+269
-52
lines changed

pulsar/consumer_impl.go

Lines changed: 100 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ type acker interface {
4141
}
4242

4343
type consumer struct {
44-
client *client
45-
options ConsumerOptions
46-
consumers []*partitionConsumer
44+
sync.Mutex
45+
topic string
46+
client *client
47+
options ConsumerOptions
48+
consumers []*partitionConsumer
49+
consumerName string
4750

4851
// channel used to deliver message to clients
4952
messageCh chan ConsumerMessage
@@ -52,6 +55,7 @@ type consumer struct {
5255
closeOnce sync.Once
5356
closeCh chan struct{}
5457
errorCh chan error
58+
ticker *time.Ticker
5559

5660
log *log.Entry
5761
}
@@ -118,9 +122,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
118122
return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
119123
}
120124

121-
func internalTopicSubscribe(client *client, options ConsumerOptions, topic string,
125+
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
122126
messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {
127+
123128
consumer := &consumer{
129+
topic: topic,
124130
client: client,
125131
options: options,
126132
messageCh: messageCh,
@@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
130136
log: log.WithField("topic", topic),
131137
}
132138

133-
partitions, err := client.TopicPartitions(topic)
139+
if options.Name != "" {
140+
consumer.consumerName = options.Name
141+
} else {
142+
consumer.consumerName = generateRandomName()
143+
}
144+
145+
err := consumer.internalTopicSubscribeToPartitions()
134146
if err != nil {
135147
return nil, err
136148
}
137149

138-
numPartitions := len(partitions)
139-
consumer.consumers = make([]*partitionConsumer, numPartitions)
150+
// set up timer to monitor for new partitions being added
151+
duration := options.AutoDiscoveryPeriod
152+
if duration <= 0 {
153+
duration = defaultAutoDiscoveryDuration
154+
}
155+
consumer.ticker = time.NewTicker(duration)
156+
157+
go func() {
158+
for range consumer.ticker.C {
159+
consumer.log.Debug("Auto discovering new partitions")
160+
consumer.internalTopicSubscribeToPartitions()
161+
}
162+
}()
163+
164+
return consumer, nil
165+
}
166+
167+
func (c *consumer) internalTopicSubscribeToPartitions() error {
168+
partitions, err := c.client.TopicPartitions(c.topic)
169+
if err != nil {
170+
return err
171+
}
172+
173+
oldNumPartitions := 0
174+
newNumPartitions := len(partitions)
175+
176+
c.Lock()
177+
defer c.Unlock()
178+
oldConsumers := c.consumers
179+
180+
if oldConsumers != nil {
181+
oldNumPartitions = len(oldConsumers)
182+
if oldNumPartitions == newNumPartitions {
183+
c.log.Debug("Number of partitions in topic has not changed")
184+
return nil
185+
}
186+
187+
c.log.WithField("old_partitions", oldNumPartitions).
188+
WithField("new_partitions", newNumPartitions).
189+
Info("Changed number of partitions in topic")
190+
}
191+
192+
c.consumers = make([]*partitionConsumer, newNumPartitions)
193+
194+
// Copy over the existing consumer instances
195+
for i := 0; i < oldNumPartitions; i++ {
196+
c.consumers[i] = oldConsumers[i]
197+
}
140198

141199
type ConsumerError struct {
142200
err error
143201
partition int
144202
consumer *partitionConsumer
145203
}
146204

147-
consumerName := options.Name
148-
if consumerName == "" {
149-
consumerName = generateRandomName()
150-
}
205+
receiverQueueSize := c.options.ReceiverQueueSize
206+
metadata := c.options.Properties
151207

152-
receiverQueueSize := options.ReceiverQueueSize
153-
metadata := options.Properties
208+
partitionsToAdd := newNumPartitions - oldNumPartitions
154209
var wg sync.WaitGroup
155-
ch := make(chan ConsumerError, numPartitions)
156-
wg.Add(numPartitions)
157-
for partitionIdx, partitionTopic := range partitions {
210+
ch := make(chan ConsumerError, partitionsToAdd)
211+
wg.Add(partitionsToAdd)
212+
213+
for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
214+
partitionTopic := partitions[partitionIdx]
215+
158216
go func(idx int, pt string) {
159217
defer wg.Done()
160218

161219
var nackRedeliveryDelay time.Duration
162-
if options.NackRedeliveryDelay == 0 {
220+
if c.options.NackRedeliveryDelay == 0 {
163221
nackRedeliveryDelay = defaultNackRedeliveryDelay
164222
} else {
165-
nackRedeliveryDelay = options.NackRedeliveryDelay
223+
nackRedeliveryDelay = c.options.NackRedeliveryDelay
166224
}
167225
opts := &partitionConsumerOpts{
168226
topic: pt,
169-
consumerName: consumerName,
170-
subscription: options.SubscriptionName,
171-
subscriptionType: options.Type,
172-
subscriptionInitPos: options.SubscriptionInitialPosition,
227+
consumerName: c.consumerName,
228+
subscription: c.options.SubscriptionName,
229+
subscriptionType: c.options.Type,
230+
subscriptionInitPos: c.options.SubscriptionInitialPosition,
173231
partitionIdx: idx,
174232
receiverQueueSize: receiverQueueSize,
175233
nackRedeliveryDelay: nackRedeliveryDelay,
176234
metadata: metadata,
177-
replicateSubscriptionState: options.ReplicateSubscriptionState,
235+
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
178236
startMessageID: nil,
179237
subscriptionMode: durable,
180-
readCompacted: options.ReadCompacted,
238+
readCompacted: c.options.ReadCompacted,
181239
}
182-
cons, err := newPartitionConsumer(consumer, client, opts, messageCh, dlq)
240+
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
183241
ch <- ConsumerError{
184242
err: err,
185243
partition: idx,
@@ -197,34 +255,37 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
197255
if ce.err != nil {
198256
err = ce.err
199257
} else {
200-
consumer.consumers[ce.partition] = ce.consumer
258+
c.consumers[ce.partition] = ce.consumer
201259
}
202260
}
203261

204262
if err != nil {
205263
// Since there were some failures,
206264
// cleanup all the partitions that succeeded in creating the consumer
207-
for _, c := range consumer.consumers {
265+
for _, c := range c.consumers {
208266
if c != nil {
209267
c.Close()
210268
}
211269
}
212-
return nil, err
270+
return err
213271
}
214272

215-
return consumer, nil
273+
return nil
216274
}
217275

218276
func topicSubscribe(client *client, options ConsumerOptions, topic string,
219277
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
220-
return internalTopicSubscribe(client, options, topic, messageCh, dlqRouter)
278+
return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
221279
}
222280

223281
func (c *consumer) Subscription() string {
224282
return c.options.SubscriptionName
225283
}
226284

227285
func (c *consumer) Unsubscribe() error {
286+
c.Lock()
287+
defer c.Unlock()
288+
228289
var errMsg string
229290
for _, consumer := range c.consumers {
230291
if err := consumer.Unsubscribe(); err != nil {
@@ -298,6 +359,9 @@ func (c *consumer) NackID(msgID MessageID) {
298359

299360
func (c *consumer) Close() {
300361
c.closeOnce.Do(func() {
362+
c.Lock()
363+
defer c.Unlock()
364+
301365
var wg sync.WaitGroup
302366
for i := range c.consumers {
303367
wg.Add(1)
@@ -308,12 +372,16 @@ func (c *consumer) Close() {
308372
}
309373
wg.Wait()
310374
close(c.closeCh)
375+
c.ticker.Stop()
311376
c.client.handlers.Del(c)
312377
c.dlq.close()
313378
})
314379
}
315380

316381
func (c *consumer) Seek(msgID MessageID) error {
382+
c.Lock()
383+
defer c.Unlock()
384+
317385
if len(c.consumers) > 1 {
318386
return errors.New("for partition topic, seek command should perform on the individual partitions")
319387
}
@@ -327,6 +395,8 @@ func (c *consumer) Seek(msgID MessageID) error {
327395
}
328396

329397
func (c *consumer) SeekByTime(time time.Time) error {
398+
c.Lock()
399+
defer c.Unlock()
330400
if len(c.consumers) > 1 {
331401
return errors.New("for partition topic, seek command should perform on the individual partitions")
332402
}

pulsar/consumer_regex.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (c *regexConsumer) monitor() {
237237
case <-c.closeCh:
238238
return
239239
case <-c.ticker.C:
240-
log.Debug("Auto discovering topics")
240+
c.log.Debug("Auto discovering topics")
241241
if !c.closed() {
242242
c.discover()
243243
}
@@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
361361
for _, t := range topics {
362362
go func(topic string) {
363363
defer wg.Done()
364-
c, err := internalTopicSubscribe(c, opts, topic, ch, dlq)
364+
c, err := newInternalConsumer(c, opts, topic, ch, dlq)
365365
consumerErrorCh <- consumerError{
366366
err: err,
367367
topic: topic,

pulsar/consumer_test.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"fmt"
2323
"log"
24+
"net/http"
25+
"strconv"
2426
"testing"
2527
"time"
2628

@@ -326,7 +328,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
326328
topic := "persistent://public/default/testGetPartitions"
327329
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
328330

329-
makeHTTPCall(t, testURL, "64")
331+
makeHTTPCall(t, http.MethodPut, testURL, "64")
330332

331333
// create producer
332334
producer, err := client.CreateProducer(ProducerOptions{
@@ -410,7 +412,7 @@ func TestConsumerShared(t *testing.T) {
410412
topic := "persistent://public/default/testMultiPartitionConsumerShared"
411413
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
412414

413-
makeHTTPCall(t, testURL, "3")
415+
makeHTTPCall(t, http.MethodPut, testURL, "3")
414416

415417
sub := "sub-shared-1"
416418
consumer1, err := client.Subscribe(ConsumerOptions{
@@ -1207,3 +1209,68 @@ func TestGetDeliveryCount(t *testing.T) {
12071209
assert.Nil(t, err)
12081210
assert.Equal(t, uint32(3), msg.RedeliveryCount())
12091211
}
1212+
1213+
func TestConsumerAddTopicPartitions(t *testing.T) {
1214+
client, err := NewClient(ClientOptions{
1215+
URL: lookupURL,
1216+
})
1217+
assert.Nil(t, err)
1218+
defer client.Close()
1219+
1220+
topic := newTopicName()
1221+
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
1222+
makeHTTPCall(t, http.MethodPut, testURL, "3")
1223+
1224+
// create producer
1225+
partitionsAutoDiscoveryInterval = 100 * time.Millisecond
1226+
producer, err := client.CreateProducer(ProducerOptions{
1227+
Topic: topic,
1228+
MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int {
1229+
// The message key will contain the partition id where to route
1230+
i, err := strconv.Atoi(msg.Key)
1231+
assert.NoError(t, err)
1232+
return i
1233+
},
1234+
})
1235+
assert.Nil(t, err)
1236+
defer producer.Close()
1237+
1238+
consumer, err := client.Subscribe(ConsumerOptions{
1239+
Topic: topic,
1240+
SubscriptionName: "my-sub",
1241+
AutoDiscoveryPeriod: 100 * time.Millisecond,
1242+
})
1243+
assert.Nil(t, err)
1244+
defer consumer.Close()
1245+
1246+
// Increase number of partitions to 10
1247+
makeHTTPCall(t, http.MethodPost, testURL, "10")
1248+
1249+
// Wait for the producer/consumers to pick up the change
1250+
time.Sleep(1 * time.Second)
1251+
1252+
// Publish messages ensuring that they will go to all the partitions
1253+
ctx := context.Background()
1254+
for i := 0; i < 10; i++ {
1255+
_, err := producer.Send(ctx, &ProducerMessage{
1256+
Key: fmt.Sprintf("%d", i),
1257+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1258+
})
1259+
assert.Nil(t, err)
1260+
}
1261+
1262+
msgs := make([]string, 0)
1263+
1264+
for i := 0; i < 10; i++ {
1265+
msg, err := consumer.Receive(ctx)
1266+
assert.Nil(t, err)
1267+
msgs = append(msgs, string(msg.Payload()))
1268+
1269+
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
1270+
msg.ID(), string(msg.Payload()))
1271+
1272+
consumer.Ack(msg)
1273+
}
1274+
1275+
assert.Equal(t, len(msgs), 10)
1276+
}

0 commit comments

Comments
 (0)