From 17a989e0fe7254dcf5d715ca8ce0a4b229d6e8d8 Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Thu, 22 May 2025 23:35:29 +0800 Subject: [PATCH 1/5] feat: support subscribeMode field on pulsar Signed-off-by: Eric Shen --- pubsub/pulsar/metadata.go | 1 + pubsub/pulsar/metadata.yaml | 11 +++++++- pubsub/pulsar/pulsar.go | 35 ++++++++++++++++++++++++ pubsub/pulsar/pulsar_test.go | 53 ++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 1 deletion(-) diff --git a/pubsub/pulsar/metadata.go b/pubsub/pulsar/metadata.go index ee306328b1..b3f54c1336 100644 --- a/pubsub/pulsar/metadata.go +++ b/pubsub/pulsar/metadata.go @@ -39,6 +39,7 @@ type pulsarMetadata struct { ReceiverQueueSize int `mapstructure:"receiverQueueSize"` SubscriptionType string `mapstructure:"subscribeType"` SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"` + SubscriptionMode string `mapstructure:"subscribeMode"` Token string `mapstructure:"token"` oauth2.ClientCredentialsMetadata `mapstructure:",squash"` } diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index 33069a369c..10b5af7b36 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -201,4 +201,13 @@ metadata: example: '"earliest"' url: title: "Pulsar SubscriptionInitialPosition" - url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition" \ No newline at end of file + url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition" + - name: subscribeMode + type: string + description: | + Subscription mode indicates the cursor belongs to "durable" type or "non_durable" type. + default: '"durable"' + example: '"durable"' + url: + title: "Pulsar SubscriptionMode" + url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode" \ No newline at end of file diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index ebdbcf4a3b..ad85aee465 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -99,6 +99,11 @@ const ( subscribePositionEarliest = "earliest" subscribePositionLatest = "latest" + + subscribeMode = "subscribeMode" + + subscribeModeDurable = "durable" + subscribeModeNonDurable = "non_durable" ) type ProcessMode string @@ -154,6 +159,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { return nil, errors.New("invalid subscription initial position. Accepted values are `latest` and `earliest`") } + m.SubscriptionMode, err = parseSubscriptionMode(meta.Properties[subscribeMode]) + if err != nil { + return nil, errors.New("invalid subscription mode") + } + for k, v := range meta.Properties { switch { case strings.HasSuffix(k, topicJSONSchemaIdentifier): @@ -455,6 +465,30 @@ func getSubscribePosition(subsPositionStr string) pulsar.SubscriptionInitialPosi return subsPosition } +func parseSubscriptionMode(in string) (string, error) { + subsMode := strings.ToLower(in) + switch subsMode { + case subscribeModeDurable, subscribeModeNonDurable: + return subsMode, nil + case "": + return subscribeModeDurable, nil + default: + return "", fmt.Errorf("invalid subscription mode: %s", subsMode) + } +} + +func getSubscriptionMode(subsModeStr string) pulsar.SubscriptionMode { + var subsMode pulsar.SubscriptionMode + + switch subsModeStr { + case subscribeModeDurable: + subsMode = pulsar.Durable + case subscribeModeNonDurable: + subsMode = pulsar.NonDurable + } + return subsMode +} + func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { if p.closed.Load() { return errors.New("component is closed") @@ -474,6 +508,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han SubscriptionName: p.metadata.ConsumerID, Type: getSubscribeType(subscribeType), SubscriptionInitialPosition: getSubscribePosition(subscribeInitialPosition), + SubscriptionMode: getSubscriptionMode(subscribeMode), MessageChannel: channel, NackRedeliveryDelay: p.metadata.RedeliveryDelay, ReceiverQueueSize: p.metadata.ReceiverQueueSize, diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index 33e6e03907..e09b895080 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -170,6 +170,59 @@ func TestParsePulsarMetadataSubscriptionInitialPosition(t *testing.T) { } } +func TestParsePulsarMetadataSubscriptionMode(t *testing.T) { + tt := []struct { + name string + subscribeMode string + expected string + err bool + }{ + { + name: "test valid subscribe mode - durable", + subscribeMode: "durable", + expected: "durable", + err: false, + }, + { + name: "test valid subscribe mode - non_durable", + subscribeMode: "non_durable", + expected: "non_durable", + err: false, + }, + { + name: "test valid subscribe mode - empty", + subscribeMode: "", + expected: "durable", + err: false, + }, + { + name: "test invalid subscribe mode", + subscribeMode: "invalid", + err: true, + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + m := pubsub.Metadata{} + + m.Properties = map[string]string{ + "host": "a", + "subscribeMode": tc.subscribeMode, + } + meta, err := parsePulsarMetadata(m) + + if tc.err { + require.Error(t, err) + assert.Nil(t, meta) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.expected, meta.SubscriptionMode) + }) + } +} + func TestParsePulsarSchemaMetadata(t *testing.T) { t.Run("test json", func(t *testing.T) { m := pubsub.Metadata{} From ee3e7bd50af82ca366115d73aa51f9df276d6ec1 Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Fri, 23 May 2025 10:00:40 +0800 Subject: [PATCH 2/5] update the metadata description Signed-off-by: Eric Shen --- pubsub/pulsar/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index 10b5af7b36..8277583055 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -205,7 +205,7 @@ metadata: - name: subscribeMode type: string description: | - Subscription mode indicates the cursor belongs to "durable" type or "non_durable" type. + Subscription mode indicates the cursor belongs to "durable" type or "non_durable" type, durable subscription retains messages and persists the current position. default: '"durable"' example: '"durable"' url: From ae5984738d3857973ec359e788c8f25138e4f31a Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Wed, 28 May 2025 18:20:24 +0800 Subject: [PATCH 3/5] Update pubsub/pulsar/pulsar.go Co-authored-by: Mike Nguyen Signed-off-by: Eric Shen --- pubsub/pulsar/pulsar.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index ad85aee465..16987b5a81 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -478,17 +478,13 @@ func parseSubscriptionMode(in string) (string, error) { } func getSubscriptionMode(subsModeStr string) pulsar.SubscriptionMode { - var subsMode pulsar.SubscriptionMode - switch subsModeStr { - case subscribeModeDurable: - subsMode = pulsar.Durable case subscribeModeNonDurable: - subsMode = pulsar.NonDurable + return pulsar.NonDurable + default: + return pulsar.Durable } - return subsMode } - func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { if p.closed.Load() { return errors.New("component is closed") From bf064af9dcb9298f7c1ded9d3cc51f9ca49249ce Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Sat, 31 May 2025 17:47:12 +0800 Subject: [PATCH 4/5] add more combination tests Signed-off-by: Eric Shen --- pubsub/pulsar/pulsar_test.go | 103 +++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index e09b895080..7ade8c9fc9 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -223,6 +223,109 @@ func TestParsePulsarMetadataSubscriptionMode(t *testing.T) { } } +func TestParsePulsarMetadataSubscriptionCombination(t *testing.T) { + tt := []struct { + name string + subscribeType string + subscribeInitialPosition string + subscribeMode string + expectedType string + expectedInitialPosition string + expectedMode string + err bool + }{ + { + name: "test valid subscribe - default", + subscribeType: "", + subscribeInitialPosition: "", + subscribeMode: "", + expectedType: "shared", + expectedInitialPosition: "latest", + expectedMode: "durable", + err: false, + }, + { + name: "test valid subscribe - pass case 1", + subscribeType: "key_shared", + subscribeInitialPosition: "earliest", + subscribeMode: "non_durable", + expectedType: "key_shared", + expectedInitialPosition: "earliest", + expectedMode: "non_durable", + err: false, + }, + { + name: "test valid subscribe - pass case 2", + subscribeType: "exclusive", + subscribeInitialPosition: "latest", + subscribeMode: "durable", + expectedType: "exclusive", + expectedInitialPosition: "latest", + expectedMode: "durable", + err: false, + }, + { + name: "test valid subscribe - pass case 3", + subscribeType: "failover", + subscribeInitialPosition: "earliest", + subscribeMode: "durable", + expectedType: "failover", + expectedInitialPosition: "earliest", + expectedMode: "durable", + err: false, + }, + { + name: "test valid subscribe - pass case 4", + subscribeType: "shared", + subscribeInitialPosition: "latest", + subscribeMode: "non_durable", + expectedType: "shared", + expectedInitialPosition: "latest", + expectedMode: "non_durable", + err: false, + }, + { + name: "test valid subscribe - fail case 1", + subscribeType: "invalid", + err: true, + }, + { + name: "test valid subscribe - fail case 2", + subscribeInitialPosition: "invalid", + err: true, + }, + { + name: "test valid subscribe - fail case 3", + subscribeMode: "invalid", + err: true, + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + m := pubsub.Metadata{} + + m.Properties = map[string]string{ + "host": "a", + "subscribeType": tc.subscribeType, + "subscribeInitialPosition": tc.subscribeInitialPosition, + "subscribeMode": tc.subscribeMode, + } + meta, err := parsePulsarMetadata(m) + + if tc.err { + require.Error(t, err) + assert.Nil(t, meta) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.expectedType, meta.SubscriptionType) + assert.Equal(t, tc.expectedInitialPosition, meta.SubscriptionInitialPosition) + assert.Equal(t, tc.expectedMode, meta.SubscriptionMode) + }) + } +} + func TestParsePulsarSchemaMetadata(t *testing.T) { t.Run("test json", func(t *testing.T) { m := pubsub.Metadata{} From 14f449dad86a9116d847825a5f30ea31f0070960 Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Wed, 11 Jun 2025 20:13:04 +0800 Subject: [PATCH 5/5] fix lint Signed-off-by: Eric Shen --- pubsub/pulsar/pulsar.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 16987b5a81..56c1d9b322 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -485,6 +485,7 @@ func getSubscriptionMode(subsModeStr string) pulsar.SubscriptionMode { return pulsar.Durable } } + func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { if p.closed.Load() { return errors.New("component is closed")