Skip to content

Commit 189fddd

Browse files
ericsyhmikeeedaixiang0nelson-parenteEric Shen
committed
feat: support subscribeMode field on pulsar (dapr#3831)
Signed-off-by: Eric Shen <ericshenyuhao@outlook.com> Signed-off-by: Eric Shen <ershen@ebay.com> Co-authored-by: Mike Nguyen <hey@mike.ee> Co-authored-by: Loong Dai <long0dai@foxmail.com> Co-authored-by: Nelson Parente <nelson_parente@live.com.pt> Co-authored-by: Eric Shen <ershen@ebay.com> Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
1 parent ada2d55 commit 189fddd

File tree

4 files changed

+199
-1
lines changed

4 files changed

+199
-1
lines changed

pubsub/pulsar/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type pulsarMetadata struct {
3939
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
4040
SubscriptionType string `mapstructure:"subscribeType"`
4141
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
42+
SubscriptionMode string `mapstructure:"subscribeMode"`
4243
Token string `mapstructure:"token"`
4344
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
4445
}

pubsub/pulsar/metadata.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,13 @@ metadata:
201201
example: '"earliest"'
202202
url:
203203
title: "Pulsar SubscriptionInitialPosition"
204-
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
204+
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
205+
- name: subscribeMode
206+
type: string
207+
description: |
208+
Subscription mode indicates the cursor belongs to "durable" type or "non_durable" type, durable subscription retains messages and persists the current position.
209+
default: '"durable"'
210+
example: '"durable"'
211+
url:
212+
title: "Pulsar SubscriptionMode"
213+
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode"

pubsub/pulsar/pulsar.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ const (
9999

100100
subscribePositionEarliest = "earliest"
101101
subscribePositionLatest = "latest"
102+
103+
subscribeMode = "subscribeMode"
104+
105+
subscribeModeDurable = "durable"
106+
subscribeModeNonDurable = "non_durable"
102107
)
103108

104109
type ProcessMode string
@@ -154,6 +159,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
154159
return nil, errors.New("invalid subscription initial position. Accepted values are `latest` and `earliest`")
155160
}
156161

162+
m.SubscriptionMode, err = parseSubscriptionMode(meta.Properties[subscribeMode])
163+
if err != nil {
164+
return nil, errors.New("invalid subscription mode")
165+
}
166+
157167
for k, v := range meta.Properties {
158168
switch {
159169
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
@@ -455,6 +465,27 @@ func getSubscribePosition(subsPositionStr string) pulsar.SubscriptionInitialPosi
455465
return subsPosition
456466
}
457467

468+
func parseSubscriptionMode(in string) (string, error) {
469+
subsMode := strings.ToLower(in)
470+
switch subsMode {
471+
case subscribeModeDurable, subscribeModeNonDurable:
472+
return subsMode, nil
473+
case "":
474+
return subscribeModeDurable, nil
475+
default:
476+
return "", fmt.Errorf("invalid subscription mode: %s", subsMode)
477+
}
478+
}
479+
480+
func getSubscriptionMode(subsModeStr string) pulsar.SubscriptionMode {
481+
switch subsModeStr {
482+
case subscribeModeNonDurable:
483+
return pulsar.NonDurable
484+
default:
485+
return pulsar.Durable
486+
}
487+
}
488+
458489
func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
459490
if p.closed.Load() {
460491
return errors.New("component is closed")
@@ -474,6 +505,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
474505
SubscriptionName: p.metadata.ConsumerID,
475506
Type: getSubscribeType(subscribeType),
476507
SubscriptionInitialPosition: getSubscribePosition(subscribeInitialPosition),
508+
SubscriptionMode: getSubscriptionMode(subscribeMode),
477509
MessageChannel: channel,
478510
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
479511
ReceiverQueueSize: p.metadata.ReceiverQueueSize,

pubsub/pulsar/pulsar_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,162 @@ func TestParsePulsarMetadataSubscriptionInitialPosition(t *testing.T) {
170170
}
171171
}
172172

173+
func TestParsePulsarMetadataSubscriptionMode(t *testing.T) {
174+
tt := []struct {
175+
name string
176+
subscribeMode string
177+
expected string
178+
err bool
179+
}{
180+
{
181+
name: "test valid subscribe mode - durable",
182+
subscribeMode: "durable",
183+
expected: "durable",
184+
err: false,
185+
},
186+
{
187+
name: "test valid subscribe mode - non_durable",
188+
subscribeMode: "non_durable",
189+
expected: "non_durable",
190+
err: false,
191+
},
192+
{
193+
name: "test valid subscribe mode - empty",
194+
subscribeMode: "",
195+
expected: "durable",
196+
err: false,
197+
},
198+
{
199+
name: "test invalid subscribe mode",
200+
subscribeMode: "invalid",
201+
err: true,
202+
},
203+
}
204+
for _, tc := range tt {
205+
t.Run(tc.name, func(t *testing.T) {
206+
m := pubsub.Metadata{}
207+
208+
m.Properties = map[string]string{
209+
"host": "a",
210+
"subscribeMode": tc.subscribeMode,
211+
}
212+
meta, err := parsePulsarMetadata(m)
213+
214+
if tc.err {
215+
require.Error(t, err)
216+
assert.Nil(t, meta)
217+
return
218+
}
219+
220+
require.NoError(t, err)
221+
assert.Equal(t, tc.expected, meta.SubscriptionMode)
222+
})
223+
}
224+
}
225+
226+
func TestParsePulsarMetadataSubscriptionCombination(t *testing.T) {
227+
tt := []struct {
228+
name string
229+
subscribeType string
230+
subscribeInitialPosition string
231+
subscribeMode string
232+
expectedType string
233+
expectedInitialPosition string
234+
expectedMode string
235+
err bool
236+
}{
237+
{
238+
name: "test valid subscribe - default",
239+
subscribeType: "",
240+
subscribeInitialPosition: "",
241+
subscribeMode: "",
242+
expectedType: "shared",
243+
expectedInitialPosition: "latest",
244+
expectedMode: "durable",
245+
err: false,
246+
},
247+
{
248+
name: "test valid subscribe - pass case 1",
249+
subscribeType: "key_shared",
250+
subscribeInitialPosition: "earliest",
251+
subscribeMode: "non_durable",
252+
expectedType: "key_shared",
253+
expectedInitialPosition: "earliest",
254+
expectedMode: "non_durable",
255+
err: false,
256+
},
257+
{
258+
name: "test valid subscribe - pass case 2",
259+
subscribeType: "exclusive",
260+
subscribeInitialPosition: "latest",
261+
subscribeMode: "durable",
262+
expectedType: "exclusive",
263+
expectedInitialPosition: "latest",
264+
expectedMode: "durable",
265+
err: false,
266+
},
267+
{
268+
name: "test valid subscribe - pass case 3",
269+
subscribeType: "failover",
270+
subscribeInitialPosition: "earliest",
271+
subscribeMode: "durable",
272+
expectedType: "failover",
273+
expectedInitialPosition: "earliest",
274+
expectedMode: "durable",
275+
err: false,
276+
},
277+
{
278+
name: "test valid subscribe - pass case 4",
279+
subscribeType: "shared",
280+
subscribeInitialPosition: "latest",
281+
subscribeMode: "non_durable",
282+
expectedType: "shared",
283+
expectedInitialPosition: "latest",
284+
expectedMode: "non_durable",
285+
err: false,
286+
},
287+
{
288+
name: "test valid subscribe - fail case 1",
289+
subscribeType: "invalid",
290+
err: true,
291+
},
292+
{
293+
name: "test valid subscribe - fail case 2",
294+
subscribeInitialPosition: "invalid",
295+
err: true,
296+
},
297+
{
298+
name: "test valid subscribe - fail case 3",
299+
subscribeMode: "invalid",
300+
err: true,
301+
},
302+
}
303+
for _, tc := range tt {
304+
t.Run(tc.name, func(t *testing.T) {
305+
m := pubsub.Metadata{}
306+
307+
m.Properties = map[string]string{
308+
"host": "a",
309+
"subscribeType": tc.subscribeType,
310+
"subscribeInitialPosition": tc.subscribeInitialPosition,
311+
"subscribeMode": tc.subscribeMode,
312+
}
313+
meta, err := parsePulsarMetadata(m)
314+
315+
if tc.err {
316+
require.Error(t, err)
317+
assert.Nil(t, meta)
318+
return
319+
}
320+
321+
require.NoError(t, err)
322+
assert.Equal(t, tc.expectedType, meta.SubscriptionType)
323+
assert.Equal(t, tc.expectedInitialPosition, meta.SubscriptionInitialPosition)
324+
assert.Equal(t, tc.expectedMode, meta.SubscriptionMode)
325+
})
326+
}
327+
}
328+
173329
func TestParsePulsarSchemaMetadata(t *testing.T) {
174330
t.Run("test json", func(t *testing.T) {
175331
m := pubsub.Metadata{}

0 commit comments

Comments
 (0)