Skip to content

Commit 3700df9

Browse files
passuiedmikeee
authored andcommitted
Allow specifying the Consumer Group rebalancing strategy from metadata (dapr#3855)
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
1 parent c986c44 commit 3700df9

File tree

5 files changed

+157
-25
lines changed

5 files changed

+157
-25
lines changed

bindings/kafka/metadata.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,14 @@ metadata:
361361
The default is none.
362362
example: '"gzip"'
363363
default: "none"
364+
- name: consumerGroupRebalanceStrategy
365+
type: string
366+
required: false
367+
description: |
368+
The strategy to use for consumer group rebalancing.
369+
example: '"sticky"'
370+
default: '"range"'
371+
allowedValues:
372+
- "range"
373+
- "sticky"
374+
- "roundrobin"

common/component/kafka/kafka.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
152152
config.Consumer.Fetch.Default = meta.consumerFetchDefault
153153
config.Consumer.Group.Heartbeat.Interval = meta.HeartbeatInterval
154154
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
155+
k.initConsumerGroupRebalanceStrategy(config, metadata)
155156
config.ChannelBufferSize = meta.channelBufferSize
156157

157158
config.Producer.Compression = meta.internalCompression
@@ -260,6 +261,25 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
260261
return nil
261262
}
262263

264+
func (k *Kafka) initConsumerGroupRebalanceStrategy(config *sarama.Config, metadata map[string]string) {
265+
consumerGroupRebalanceStrategy, ok := kitmd.GetMetadataProperty(metadata, "consumerGroupRebalanceStrategy")
266+
if !ok {
267+
consumerGroupRebalanceStrategy = consumerGroupRebalanceStrategyRange
268+
}
269+
switch strings.ToLower(consumerGroupRebalanceStrategy) {
270+
case consumerGroupRebalanceStrategySticky:
271+
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
272+
case consumerGroupRebalanceStrategyRoundRobin:
273+
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
274+
case consumerGroupRebalanceStrategyRange:
275+
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
276+
default:
277+
k.logger.Warnf("Invalid consumer group rebalance strategy: %s. Using default strategy: '%s'", consumerGroupRebalanceStrategy, consumerGroupRebalanceStrategyRange)
278+
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
279+
}
280+
k.logger.Infof("Consumer group rebalance strategy set to '%s'", config.Consumer.Group.Rebalance.GroupStrategies[0].Name())
281+
}
282+
263283
func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) {
264284
const defaultSessionName = "DaprDefaultSession"
265285
// This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead.

common/component/kafka/kafka_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,3 +432,89 @@ func TestValidateAWS(t *testing.T) {
432432
})
433433
}
434434
}
435+
436+
func TestInitConsumerGroupRebalanceStrategy(t *testing.T) {
437+
tests := []struct {
438+
name string
439+
metadata map[string]string
440+
expectedStrategy string
441+
}{
442+
{
443+
name: "missing consumerGroupRebalanceStrategy property defaults to Range",
444+
metadata: map[string]string{},
445+
expectedStrategy: "range",
446+
},
447+
{
448+
name: "empty consumerGroupRebalanceStrategy property defaults to Range",
449+
metadata: map[string]string{
450+
"consumerGroupRebalanceStrategy": "",
451+
},
452+
expectedStrategy: "range",
453+
},
454+
{
455+
name: "valid sticky strategy",
456+
metadata: map[string]string{
457+
"consumerGroupRebalanceStrategy": "sticky",
458+
},
459+
expectedStrategy: "sticky",
460+
},
461+
{
462+
name: "valid roundrobin strategy",
463+
metadata: map[string]string{
464+
"consumerGroupRebalanceStrategy": "roundrobin",
465+
},
466+
expectedStrategy: "roundrobin",
467+
},
468+
{
469+
name: "valid range strategy",
470+
metadata: map[string]string{
471+
"consumerGroupRebalanceStrategy": "range",
472+
},
473+
expectedStrategy: "range",
474+
},
475+
{
476+
name: "case insensitive strategy",
477+
metadata: map[string]string{
478+
"consumerGroupRebalanceStrategy": "sTickY",
479+
},
480+
expectedStrategy: "sticky",
481+
},
482+
{
483+
name: "case insensitive strategy default",
484+
metadata: map[string]string{
485+
"consumerGroupRebalanceStrategy": "Range",
486+
},
487+
expectedStrategy: "range",
488+
},
489+
{
490+
name: "invalid strategy defaults to Range with warning",
491+
metadata: map[string]string{
492+
"consumerGroupRebalanceStrategy": "invalid",
493+
},
494+
expectedStrategy: "range",
495+
},
496+
}
497+
498+
for _, tt := range tests {
499+
t.Run(tt.name, func(t *testing.T) {
500+
// Create Kafka instance with logger
501+
k := &Kafka{
502+
logger: logger.NewLogger("kafka_test"),
503+
}
504+
505+
// Create sarama config
506+
config := sarama.NewConfig()
507+
508+
// Call the method under test
509+
k.initConsumerGroupRebalanceStrategy(config, tt.metadata)
510+
511+
// Verify the strategy was set correctly
512+
require.Len(t, config.Consumer.Group.Rebalance.GroupStrategies, 1, "Expected exactly one rebalance strategy")
513+
514+
assert.Equal(t, tt.expectedStrategy, config.Consumer.Group.Rebalance.GroupStrategies[0].Name())
515+
516+
// Note: Warning verification would require a more sophisticated mock
517+
// For now, we just verify the strategy is set correctly
518+
})
519+
}
520+
}

common/component/kafka/metadata.go

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,32 @@ import (
2727
)
2828

2929
const (
30-
key = "partitionKey"
31-
keyMetadataKey = "__key"
32-
timestampMetadataKey = "__timestamp"
33-
offsetMetadataKey = "__offset"
34-
partitionMetadataKey = "__partition"
35-
topicMetadataKey = "__topic"
36-
skipVerify = "skipVerify"
37-
caCert = "caCert"
38-
certificateAuthType = "certificate"
39-
clientCert = "clientCert"
40-
clientKey = "clientKey"
41-
consumeRetryInterval = "consumeRetryInterval"
42-
authType = "authType"
43-
passwordAuthType = "password"
44-
oidcAuthType = "oidc"
45-
mtlsAuthType = "mtls"
46-
awsIAMAuthType = "awsiam"
47-
noAuthType = "none"
48-
consumerFetchMin = "consumerFetchMin"
49-
consumerFetchDefault = "consumerFetchDefault"
50-
channelBufferSize = "channelBufferSize"
51-
valueSchemaType = "valueSchemaType"
52-
compression = "compression"
30+
key = "partitionKey"
31+
keyMetadataKey = "__key"
32+
timestampMetadataKey = "__timestamp"
33+
offsetMetadataKey = "__offset"
34+
partitionMetadataKey = "__partition"
35+
topicMetadataKey = "__topic"
36+
skipVerify = "skipVerify"
37+
caCert = "caCert"
38+
certificateAuthType = "certificate"
39+
clientCert = "clientCert"
40+
clientKey = "clientKey"
41+
consumeRetryInterval = "consumeRetryInterval"
42+
authType = "authType"
43+
passwordAuthType = "password"
44+
oidcAuthType = "oidc"
45+
mtlsAuthType = "mtls"
46+
awsIAMAuthType = "awsiam"
47+
noAuthType = "none"
48+
consumerFetchMin = "consumerFetchMin"
49+
consumerFetchDefault = "consumerFetchDefault"
50+
channelBufferSize = "channelBufferSize"
51+
valueSchemaType = "valueSchemaType"
52+
compression = "compression"
53+
consumerGroupRebalanceStrategyRange = "range"
54+
consumerGroupRebalanceStrategySticky = "sticky"
55+
consumerGroupRebalanceStrategyRoundRobin = "roundrobin"
5356

5457
// Kafka client config default values.
5558
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
@@ -99,8 +102,9 @@ type KafkaMetadata struct {
99102

100103
channelBufferSize int `mapstructure:"-"`
101104

102-
consumerFetchMin int32 `mapstructure:"-"`
103-
consumerFetchDefault int32 `mapstructure:"-"`
105+
consumerFetchMin int32 `mapstructure:"-"`
106+
consumerFetchDefault int32 `mapstructure:"-"`
107+
ConsumerGroupRebalanceStrategy string `mapstructure:"consumerGroupRebalanceStrategy"`
104108

105109
// configs for kafka producer
106110
Compression string `mapstructure:"compression"`

pubsub/kafka/metadata.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,14 @@ metadata:
352352
The default is none.
353353
example: '"gzip"'
354354
default: "none"
355+
- name: consumerGroupRebalanceStrategy
356+
type: string
357+
required: false
358+
description: |
359+
The strategy to use for consumer group rebalancing.
360+
example: '"sticky"'
361+
default: '"range"'
362+
allowedValues:
363+
- "range"
364+
- "sticky"
365+
- "roundrobin"

0 commit comments

Comments
 (0)