Skip to content

Commit cd75759

Browse files
author
Tim Middleton
committed
Topics progress - receive and commit
1 parent 9c08dfe commit cd75759

File tree

8 files changed

+630
-203
lines changed

8 files changed

+630
-203
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ MVN_VERSION ?= 1.0.0
3737
SHELL := /bin/bash
3838

3939
# Coherence CE version to run base tests against
40-
COHERENCE_VERSION ?= 22.06.12
40+
COHERENCE_VERSION ?= 22.06.13
4141
COHERENCE_GROUP_ID ?= com.oracle.coherence.ce
4242
COHERENCE_WKA1 ?= server1
4343
COHERENCE_WKA2 ?= server1
@@ -225,8 +225,8 @@ ifeq ($(SKIP_PROTO_GENERATION),true)
225225
@echo "Skipping proto generation..."
226226
else
227227
mkdir -p $(PROTO_DIR) || true
228-
curl $(CURL_AUTH) -o $(PROTO_DIR)/services.proto https://raw.githubusercontent.com/oracle/coherence/22.06.12/prj/coherence-grpc/src/main/proto/services.proto
229-
curl $(CURL_AUTH) -o $(PROTO_DIR)/messages.proto https://raw.githubusercontent.com/oracle/coherence/22.06.12/prj/coherence-grpc/src/main/proto/messages.proto
228+
curl $(CURL_AUTH) -o $(PROTO_DIR)/services.proto https://raw.githubusercontent.com/oracle/coherence/22.06.13/prj/coherence-grpc/src/main/proto/services.proto
229+
curl $(CURL_AUTH) -o $(PROTO_DIR)/messages.proto https://raw.githubusercontent.com/oracle/coherence/22.06.13/prj/coherence-grpc/src/main/proto/messages.proto
230230
echo "" >> $(PROTO_DIR)/services.proto
231231
echo "" >> $(PROTO_DIR)/messages.proto
232232
echo 'option go_package = "github.com/oracle/coherence-go-client/proto";' >> $(PROTO_DIR)/services.proto

coherence/subscriber/subscriber.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"github.com/oracle/coherence-go-client/v2/coherence/filters"
1212
pb1topics "github.com/oracle/coherence-go-client/v2/proto/topics"
13+
"time"
1314
)
1415

1516
type ReceiveStatus int32
@@ -29,15 +30,28 @@ type EnsureSubscriberResult struct {
2930
UUID string
3031
}
3132

32-
type ReceiveResult[V any] struct {
33-
Status ReceiveStatus
34-
}
35-
3633
// Options provides options for creating a subscriber.
3734
type Options struct {
3835
SubscriberGroup *string
3936
Filter filters.Filter
4037
Extractor []byte
38+
AutoCommit bool
39+
MaxMessages int32
40+
}
41+
42+
// CommitResponse represents th response from a [Subscriber] commit.
43+
type CommitResponse struct {
44+
Channel int32
45+
Position *pb1topics.TopicPosition
46+
Head *pb1topics.TopicPosition
47+
}
48+
49+
// ReceiveResponse represents a response from a [Subscriber] receive request.
50+
type ReceiveResponse[V any] struct {
51+
Channel int32
52+
Value *V
53+
Position *pb1topics.TopicPosition
54+
Timestamp time.Time
4155
}
4256

4357
// TODO: Additional options
@@ -61,6 +75,20 @@ func WithFilter(fltr filters.Filter) func(options *Options) {
6175
}
6276
}
6377

78+
// WithAutoCommit returns a function to set auto commit to be true for a [Subscriber].
79+
func WithAutoCommit() func(options *Options) {
80+
return func(s *Options) {
81+
s.AutoCommit = true
82+
}
83+
}
84+
85+
// WithMaxMessages returns a function to set the maximum messages for a[Subscriber].
86+
func WithMaxMessages(maxMessages int32) func(options *Options) {
87+
return func(s *Options) {
88+
s.MaxMessages = maxMessages
89+
}
90+
}
91+
6492
// WithTransformer returns a function to set the extractor [Subscriber].
6593
func WithTransformer(extractor []byte) func(options *Options) {
6694
return func(s *Options) {

0 commit comments

Comments
 (0)