Skip to content

Commit c77c1d5

Browse files
author
Tim Middleton
committed
Additional changes/ tests
1 parent 6bc3a6b commit c77c1d5

File tree

10 files changed

+563
-153
lines changed

10 files changed

+563
-153
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ test-e2e-standalone-queues: test-clean test gotestsum $(BUILD_PROPS) ## Run e2e
449449
# ----------------------------------------------------------------------------------------------------------------------
450450
.PHONY: test-e2e-standalone-topics
451451
test-e2e-standalone-topics: test-clean test gotestsum $(BUILD_PROPS) ## Run e2e tests with Coherence queues
452-
cd test && CGO_ENABLED=0 $(GOTESTSUM) --format testname --junitfile $(TEST_LOGS_DIR)/go-client-test-queues.xml \
452+
cd test && CGO_ENABLED=0 $(GOTESTSUM) --format testname --junitfile $(TEST_LOGS_DIR)/go-client-test-topics.xml \
453453
-- $(GO_TEST_FLAGS) -v -coverprofile=$(COVERAGE_DIR)/cover-functional-topics.out -v ./e2e/topics/... -coverpkg=github.com/oracle/coherence-go-client/v2/coherence/...
454454
go tool cover -func=$(COVERAGE_DIR)/cover-functional-topics.out | grep -v '0.0%'
455455

coherence/session.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
276276
// setLogLevel sets the log level from the COHERENCE_LOG_LEVEL environment variable.
277277
func setLogLevel(envLevel string) {
278278
var level int
279+
logLevelMutex.Lock()
280+
defer logLevelMutex.Unlock()
279281

280282
// try to convert from integer first
281283
if lvl, err := strconv.Atoi(envLevel); err == nil {

coherence/subscriber/subscriber.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Options struct {
3737
Extractor []byte
3838
AutoCommit bool
3939
MaxMessages int32
40+
Channels []int32
4041
}
4142

4243
// CommitResponse represents th response from a [Subscriber] commit.
@@ -82,13 +83,20 @@ func WithAutoCommit() func(options *Options) {
8283
}
8384
}
8485

85-
// WithMaxMessages returns a function to set the maximum messages for a[Subscriber].
86+
// WithMaxMessages returns a function to set the maximum messages for a [Subscriber].
8687
func WithMaxMessages(maxMessages int32) func(options *Options) {
8788
return func(s *Options) {
8889
s.MaxMessages = maxMessages
8990
}
9091
}
9192

93+
// WithChannels returns a function to set the channels for a [Subscriber].
94+
func WithChannels(channels []int32) func(options *Options) {
95+
return func(s *Options) {
96+
s.Channels = channels
97+
}
98+
}
99+
92100
// WithTransformer returns a function to set the extractor [Subscriber].
93101
func WithTransformer(extractor []byte) func(options *Options) {
94102
return func(s *Options) {
@@ -97,5 +105,10 @@ func WithTransformer(extractor []byte) func(options *Options) {
97105
}
98106

99107
func (o *Options) String() string {
100-
return fmt.Sprintf("options{SubscriberGroup=%v, filter=%v}", o.SubscriberGroup, o.Filter)
108+
var subGroup string
109+
if o.SubscriberGroup != nil {
110+
subGroup = *o.SubscriberGroup
111+
}
112+
return fmt.Sprintf("options{SubscriberGroup=%v, filter=%v, MaxMessages=%v, AutoCommit=%v, Transformer=%v, Channels=%v}",
113+
subGroup, o.Filter, o.MaxMessages, o.AutoCommit, o.Extractor, o.Channels)
101114
}

coherence/topics.go

Lines changed: 99 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ type NamedTopic[V any] interface {
7171
CreateSubscriberGroup(ctx context.Context, subscriberGroup string, options ...func(o *subscribergroup.Options)) error
7272

7373
// DestroySubscriberGroup destroys a subscriber group.
74-
// TODO: Not viable to implement?
7574
DestroySubscriberGroup(ctx context.Context, subscriberGroup string) error
7675

7776
// AddLifecycleListener adds a [TopicLifecycleListener] to this topic.
@@ -114,9 +113,13 @@ type Subscriber[V any] interface {
114113
// If the length of the [subscriber.ReceiveResponse] is zero this means no messages were available.
115114
Receive(ctx context.Context) ([]*subscriber.ReceiveResponse[V], error)
116115

117-
// Commit commits a channel and position meaning the message will be removed from the topic.
116+
// Commit commits a channel and position meaning the message will not be get redelivered if the subscriber gets disconnected.
118117
Commit(ctx context.Context, channel int32, position *pb1topics.TopicPosition) (*subscriber.CommitResponse, error)
119118

119+
// Peek attempts to peek messages without removing them. If the length of the [subscriber.ReceiveResponse]
120+
//is zero this means no messages were available.
121+
Peek(ctx context.Context) ([]*subscriber.ReceiveResponse[V], error)
122+
120123
// DestroySubscriberGroup destroys a subscriber group created by this subscriber.
121124
DestroySubscriberGroup(ctx context.Context, subscriberGroup string) error
122125

@@ -296,7 +299,30 @@ func (bt *baseTopicsClient[V]) CreateSubscriberGroup(ctx context.Context, subscr
296299
}
297300

298301
func (bt *baseTopicsClient[V]) DestroySubscriberGroup(ctx context.Context, subscriberGroup string) error {
299-
return DestroySubscriberGroup(ctx, bt.session, subscriberGroup)
302+
return DestroySubscriberGroup(ctx, bt.session, bt.topicID, subscriberGroup)
303+
}
304+
305+
func (bt *baseTopicsClient[V]) isTopicDestroyed() bool {
306+
bt.mutex.RLock()
307+
defer bt.mutex.RUnlock()
308+
return bt.isDestroyed
309+
}
310+
311+
func (bt *baseTopicsClient[V]) setDestroyed() {
312+
bt.mutex.Lock()
313+
defer bt.mutex.Unlock()
314+
bt.isDestroyed = true
315+
}
316+
func (bt *baseTopicsClient[V]) isTopicReleased() bool {
317+
bt.mutex.RLock()
318+
defer bt.mutex.RUnlock()
319+
return bt.isReleased
320+
}
321+
322+
func (bt *baseTopicsClient[V]) setReleased() {
323+
bt.mutex.Lock()
324+
defer bt.mutex.Unlock()
325+
bt.isReleased = true
300326
}
301327

302328
func newPublisher[V any](session *Session, bt *baseTopicsClient[V], result *publisher.EnsurePublisherResult, topicName string, options *publisher.Options) (Publisher[V], error) {
@@ -385,12 +411,12 @@ func closeSubscriber[V any](ctx context.Context, ts *topicSubscriber[V]) error {
385411
return ts.session.v1StreamManagerTopics.destroyPublisherOrSubscriber(ctx, ts.proxyID, pb1topics.TopicServiceRequestType_DestroySubscriber)
386412
}
387413

388-
func receiveInternal[V any](ctx context.Context, ts *topicSubscriber[V], maxMessages *int32) ([]*pb1topics.TopicElement, error) {
414+
func peekOrReceiveInternal[V any](ctx context.Context, ts *topicSubscriber[V], maxMessages *int32, isReceive bool) ([]*pb1topics.TopicElement, error) {
389415
if ts.isClosed {
390416
return nil, ErrSubscriberClosed
391417
}
392418

393-
return ts.session.v1StreamManagerTopics.receive(ctx, ts.proxyID, maxMessages)
419+
return ts.session.v1StreamManagerTopics.peekOrReceive(ctx, ts.proxyID, maxMessages, isReceive)
394420
}
395421

396422
func commitInternal[V any](ctx context.Context, ts *topicSubscriber[V], channel int32, position *pb1topics.TopicPosition) (*subscriber.CommitResponse, error) {
@@ -405,16 +431,24 @@ func (ts *topicSubscriber[V]) DestroySubscriberGroup(ctx context.Context, subscr
405431
return ts.session.v1StreamManagerTopics.destroySubscriberGroup(ctx, ts.proxyID, subscriberGroup)
406432
}
407433

408-
// Receive attempts to receive messages. Messages must be committed to be considered processed unless autoCommit is set.
434+
func (ts *topicSubscriber[V]) Peek(ctx context.Context) ([]*subscriber.ReceiveResponse[V], error) {
435+
return ts.peekOrReceive(ctx, false)
436+
}
437+
409438
func (ts *topicSubscriber[V]) Receive(ctx context.Context) ([]*subscriber.ReceiveResponse[V], error) {
410-
r, err := receiveInternal(ctx, ts, &ts.options.MaxMessages)
439+
return ts.peekOrReceive(ctx, true)
440+
}
441+
442+
// Receive attempts to peekOrReceive messages. Messages must be committed to be considered processed unless autoCommit is set.
443+
func (ts *topicSubscriber[V]) peekOrReceive(ctx context.Context, isReceive bool) ([]*subscriber.ReceiveResponse[V], error) {
444+
r, err := peekOrReceiveInternal(ctx, ts, &ts.options.MaxMessages, isReceive)
411445
if err != nil {
412446
return nil, err
413447
}
414448

415449
values, err := ts.decodeAll(r)
416450
if err != nil {
417-
return nil, fmt.Errorf("unable to decode receive values: %v", err)
451+
return nil, fmt.Errorf("unable to decode peek rr receive values: %v", err)
418452
}
419453

420454
if ts.options.AutoCommit && len(values) > 0 {
@@ -530,7 +564,6 @@ func CreatSubscriberWithTransformer[E any](ctx context.Context, session *Session
530564
func CreateSubscriber[V any](ctx context.Context, session *Session, topicName string, options ...func(cache *subscriber.Options)) (Subscriber[V], error) {
531565
var (
532566
subscriberOptions = &subscriber.Options{}
533-
binFilter []byte
534567
err error
535568
)
536569

@@ -545,18 +578,15 @@ func CreateSubscriber[V any](ctx context.Context, session *Session, topicName st
545578
}
546579
}
547580

548-
if subscriberOptions.Filter != nil {
549-
binFilter, err = session.genericSerializer.Serialize(subscriberOptions.Filter)
550-
if err != nil {
551-
return nil, err
552-
}
553-
}
554-
555-
result, err := session.v1StreamManagerTopics.ensureSubscriber(ctx, topicName, subscriberOptions.SubscriberGroup, binFilter)
581+
result, err := session.v1StreamManagerTopics.ensureSubscriber(ctx, topicName, subscriberOptions)
556582
if err != nil {
557583
return nil, err
558584
}
559585

586+
if result == nil {
587+
return nil, fmt.Errorf("unable to create subscriber with options %v, check server logs: %v", subscriberOptions, err)
588+
}
589+
560590
return newSubscriber[V](session, nil, result, topicName, subscriberOptions)
561591
}
562592

@@ -590,15 +620,14 @@ func CreateSubscriberGroup(ctx context.Context, session *Session, topicName stri
590620
}
591621

592622
// DestroySubscriberGroup destroys a subscriber group.
593-
func DestroySubscriberGroup(ctx context.Context, session *Session, subscriberGroup string) error {
623+
func DestroySubscriberGroup(ctx context.Context, session *Session, proxyID int32, subscriberGroup string) error {
594624
if session.v1StreamManagerTopics == nil {
595625
if err := ensureV1StreamManagerTopics(session); err != nil {
596626
return err
597627
}
598628
}
599629

600-
// TODO: Is this valid???, how can we determine the proxyID if we don't have a subscriber
601-
return session.v1StreamManagerTopics.destroySubscriberGroup(ctx, 0, subscriberGroup)
630+
return session.v1StreamManagerTopics.destroySubscriberGroup(ctx, proxyID, subscriberGroup)
602631
}
603632

604633
// ensurePublisher ensures a publisher.
@@ -636,8 +665,8 @@ func (m *streamManagerV1) ensurePublisher(ctx context.Context, topicName string,
636665
}
637666

638667
// ensureSubscriber ensures a subscriber.
639-
func (m *streamManagerV1) ensureSubscriber(ctx context.Context, topicName string, subscriberGroup *string, binFilter []byte) (*subscriber.EnsureSubscriberResult, error) {
640-
req, err := m.newEnsureSubscriberRequest(topicName, subscriberGroup, binFilter)
668+
func (m *streamManagerV1) ensureSubscriber(ctx context.Context, topicName string, options *subscriber.Options) (*subscriber.EnsureSubscriberResult, error) {
669+
req, err := m.newEnsureSubscriberRequest(topicName, options)
641670
if err != nil {
642671
return nil, err
643672
}
@@ -672,14 +701,26 @@ func (m *streamManagerV1) ensureSubscriber(ctx context.Context, topicName string
672701
return &s, nil
673702
}
674703

675-
// receive calls receive for a subscriber.
676-
func (m *streamManagerV1) receive(ctx context.Context, proxyID int32, maxMessages *int32) ([]*pb1topics.TopicElement, error) {
677-
req, err := m.newSubscriberReceiveRequest(proxyID, maxMessages)
704+
// peekOrReceive calls peek or receive for a subscriber.
705+
func (m *streamManagerV1) peekOrReceive(ctx context.Context, proxyID int32, maxMessages *int32, isReceive bool) ([]*pb1topics.TopicElement, error) {
706+
var (
707+
messageType pb1topics.TopicServiceRequestType
708+
err error
709+
req *pb1.ProxyRequest
710+
)
711+
if isReceive {
712+
messageType = pb1topics.TopicServiceRequestType_Receive
713+
req, err = m.newSubscriberReceiveRequest(proxyID, maxMessages)
714+
} else {
715+
messageType = pb1topics.TopicServiceRequestType_PeekAtPosition
716+
req, err = m.newSubscriberPeekRequest(proxyID, maxMessages)
717+
}
718+
678719
if err != nil {
679720
return nil, err
680721
}
681722

682-
requestType, err := m.submitTopicRequest(req, pb1topics.TopicServiceRequestType_SimpleReceive)
723+
requestType, err := m.submitTopicRequest(req, messageType)
683724

684725
newCtx, cancel := m.session.ensureContext(ctx)
685726
if cancel != nil {
@@ -789,7 +830,7 @@ func (m *streamManagerV1) ensureSubscriberGroup(ctx context.Context, topicName s
789830
return err
790831
}
791832

792-
// destroySubscriberGroup destroys a subscriber.
833+
// destroySubscriberGroup destroys a subscriber group.
793834
func (m *streamManagerV1) destroySubscriberGroup(ctx context.Context, proxyID int32, subscriberGroup string) error {
794835
req, err := m.newDestroySubscriberGroupRequest(proxyID, subscriberGroup)
795836
if err != nil {
@@ -952,7 +993,7 @@ func ensureV1StreamManagerTopics(session *Session) error {
952993
}
953994

954995
func releaseTopicInternal[V any](ctx context.Context, bt *baseTopicsClient[V], destroy bool) error {
955-
if bt.isDestroyed || bt.isReleased {
996+
if bt.isTopicDestroyed() || bt.isTopicReleased() {
956997
return ErrTopicDestroyedOrReleased
957998
}
958999

@@ -969,12 +1010,12 @@ func releaseTopicInternal[V any](ctx context.Context, bt *baseTopicsClient[V], d
9691010
if err != nil {
9701011
return err
9711012
}
972-
bt.isDestroyed = true
1013+
bt.setDestroyed()
9731014
bt.generateTopicLifecycleEvent(nil, TopicDestroyed)
9741015
} else {
9751016
if existingTopic, ok := bt.session.topics[bt.name]; ok {
9761017
bt.generateTopicLifecycleEvent(existingTopic, TopicReleased)
977-
bt.isReleased = true
1018+
bt.setReleased()
9781019
}
9791020
}
9801021

@@ -1009,11 +1050,25 @@ func (m *streamManagerV1) newDestroyPublisherOrSubscriberRequest(proxyID int32,
10091050
return m.newWrapperProxyTopicsRequest("", requestType, anyReq)
10101051
}
10111052

1012-
func (m *streamManagerV1) newEnsureSubscriberRequest(topicName string, subscriberGroup *string, binFilter []byte) (*pb1.ProxyRequest, error) {
1053+
func (m *streamManagerV1) newEnsureSubscriberRequest(topicName string, options *subscriber.Options) (*pb1.ProxyRequest, error) {
1054+
var (
1055+
err error
1056+
binFilter []byte
1057+
)
1058+
1059+
if options.Filter != nil {
1060+
binFilter, err = m.session.genericSerializer.Serialize(options.Filter)
1061+
if err != nil {
1062+
return nil, err
1063+
}
1064+
}
1065+
10131066
req := &pb1topics.EnsureSimpleSubscriberRequest{
10141067
Topic: topicName,
10151068
Filter: binFilter,
1016-
SubscriberGroup: subscriberGroup,
1069+
SubscriberGroup: options.SubscriberGroup,
1070+
Extractor: options.Extractor,
1071+
Channels: options.Channels,
10171072
}
10181073

10191074
anyReq, err := anypb.New(req)
@@ -1034,6 +1089,18 @@ func (m *streamManagerV1) newSubscriberReceiveRequest(ProxyID int32, maxMessages
10341089
}
10351090
return m.newWrapperProxyPublisherRequest(ProxyID, pb1topics.TopicServiceRequestType_SimpleReceive, anyReq)
10361091
}
1092+
func (m *streamManagerV1) newSubscriberPeekRequest(ProxyID int32, maxMessages *int32) (*pb1.ProxyRequest, error) {
1093+
req := &pb1topics.SimpleReceiveRequest{
1094+
MaxMessages: maxMessages,
1095+
}
1096+
1097+
anyReq, err := anypb.New(req)
1098+
if err != nil {
1099+
return nil, err
1100+
}
1101+
// TODO: FIX THIS when SImplePeek available
1102+
return m.newWrapperProxyPublisherRequest(ProxyID, pb1topics.TopicServiceRequestType_SimpleReceive, anyReq)
1103+
}
10371104

10381105
func (m *streamManagerV1) newSubscriberCommitRequest(ProxyID int32, channel int32, position *pb1topics.TopicPosition) (*pb1.ProxyRequest, error) {
10391106
req := &pb1topics.ChannelAndPosition{

coherence/topics_events.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (t *topicLifecycleListener[V]) on(event TopicLifecycleEventType, callback f
9999
}
100100

101101
func (bt *baseTopicsClient[V]) AddLifecycleListener(listener TopicLifecycleListener[V]) error {
102-
if bt.isDestroyed || bt.isReleased {
102+
if bt.isTopicDestroyed() || bt.isTopicReleased() {
103103
return ErrTopicDestroyedOrReleased
104104
}
105105

@@ -108,7 +108,7 @@ func (bt *baseTopicsClient[V]) AddLifecycleListener(listener TopicLifecycleListe
108108
}
109109

110110
func (bt *baseTopicsClient[V]) RemoveLifecycleListener(listener TopicLifecycleListener[V]) error {
111-
if bt.isDestroyed || bt.isReleased {
111+
if bt.isTopicDestroyed() || bt.isTopicReleased() {
112112
return ErrTopicDestroyedOrReleased
113113
}
114114

coherence/v1client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
ALL logLevel = 5 // all messages
3838

3939
// current log level
40+
logLevelMutex sync.RWMutex
4041
currentLogLevel int
4142
)
4243

@@ -251,6 +252,9 @@ func (m *streamManagerV1) processResponseMessage(id int64, resp *responseMessage
251252

252253
// logMessage logs a message only if the level <= currentLogLevel
253254
func logMessage(level logLevel, format string, args ...any) {
255+
logLevelMutex.RLock()
256+
defer logLevelMutex.RUnlock()
257+
254258
if int(level) <= currentLogLevel {
255259
log.Println(getLogMessage(level, format, args...))
256260
}
@@ -561,7 +565,7 @@ func (m *streamManagerV1) ensure(ctx context.Context, name string, IDMap safeMap
561565
return nil, err
562566
}
563567

564-
// store the cache id in the session , no lock required as already locked
568+
// store the cache/topic/queue id in the session, no lock required as already locked
565569
ID = &message.Value
566570
IDMap.Add(name, *ID)
567571

java/coherence-go-test/src/main/resources/test-cache-config.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@
3232
<topic-name>*</topic-name>
3333
<scheme-name>topic-scheme</scheme-name>
3434
</topic-mapping>
35+
<topic-mapping>
36+
<topic-name>durable-*</topic-name>
37+
<scheme-name>topic-scheme</scheme-name>
38+
<subscriber-groups>
39+
<subscriber-group>
40+
<name>subscriber-group</name>
41+
</subscriber-group>
42+
</subscriber-groups>
43+
</topic-mapping>
3544
</topic-scheme-mapping>
3645

3746
<caching-schemes>

0 commit comments

Comments
 (0)