Skip to content

Commit 6edc8f4

Browse files
authored
Add flag to disable forced topic creation. (apache#226)
This flag is needed for the regex consumer.
1 parent 0f822ef commit 6edc8f4

File tree

3 files changed

+25
-16
lines changed

3 files changed

+25
-16
lines changed

pulsar/consumer_impl.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ type acker interface {
4242

4343
type consumer struct {
4444
sync.Mutex
45-
topic string
46-
client *client
47-
options ConsumerOptions
48-
consumers []*partitionConsumer
49-
consumerName string
45+
topic string
46+
client *client
47+
options ConsumerOptions
48+
consumers []*partitionConsumer
49+
consumerName string
50+
disableForceTopicCreation bool
5051

5152
// channel used to deliver message to clients
5253
messageCh chan ConsumerMessage
@@ -123,17 +124,18 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
123124
}
124125

125126
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
126-
messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {
127+
messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
127128

128129
consumer := &consumer{
129-
topic: topic,
130-
client: client,
131-
options: options,
132-
messageCh: messageCh,
133-
closeCh: make(chan struct{}),
134-
errorCh: make(chan error),
135-
dlq: dlq,
136-
log: log.WithField("topic", topic),
130+
topic: topic,
131+
client: client,
132+
options: options,
133+
disableForceTopicCreation: disableForceTopicCreation,
134+
messageCh: messageCh,
135+
closeCh: make(chan struct{}),
136+
errorCh: make(chan error),
137+
dlq: dlq,
138+
log: log.WithField("topic", topic),
137139
}
138140

139141
if options.Name != "" {
@@ -275,7 +277,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
275277

276278
func topicSubscribe(client *client, options ConsumerOptions, topic string,
277279
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
278-
return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
280+
return newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
279281
}
280282

281283
func (c *consumer) Subscription() string {

pulsar/consumer_partition.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type partitionConsumerOpts struct {
7676
startMessageIDInclusive bool
7777
subscriptionMode subscriptionMode
7878
readCompacted bool
79+
disableForceTopicCreation bool
7980
}
8081

8182
type partitionConsumer struct {
@@ -748,6 +749,12 @@ func (pc *partitionConsumer) grabConn() error {
748749
cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
749750
}
750751

752+
// force topic creation is enabled by default so
753+
// we only need to set the flag when disabling it
754+
if pc.options.disableForceTopicCreation {
755+
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
756+
}
757+
751758
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
752759
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
753760

pulsar/consumer_regex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
361361
for _, t := range topics {
362362
go func(topic string) {
363363
defer wg.Done()
364-
c, err := newInternalConsumer(c, opts, topic, ch, dlq)
364+
c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
365365
consumerErrorCh <- consumerError{
366366
err: err,
367367
topic: topic,

0 commit comments

Comments
 (0)