Skip to content

Commit 2abf3c1

Browse files
author
Tim Middleton
committed
More tests for topics
1 parent 11a755e commit 2abf3c1

File tree

3 files changed

+145
-15
lines changed

3 files changed

+145
-15
lines changed

coherence/subscriber/subscriber.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,12 @@ type ReceiveResult[V any] struct {
3737
type Options struct {
3838
SubscriberGroup *string
3939
Filter filters.Filter
40+
Extractor []byte
4041
}
4142

4243
// TODO: Additional options
4344
//// the optional name of the subscriber group
4445
//SubscriberGroup *string `protobuf:"bytes,2,opt,name=subscriberGroup,proto3,oneof" json:"subscriberGroup,omitempty"`
45-
//// an optional Filter to filter received messages
46-
//Filter []byte `protobuf:"bytes,3,opt,name=filter,proto3,oneof" json:"filter,omitempty"`
4746
//// an optional ValueExtractor to convert received messages
4847
//Extractor []byte `protobuf:"bytes,4,opt,name=extractor,proto3,oneof" json:"extractor,omitempty"`
4948
//// True to return an empty value if the topic is empty
@@ -62,6 +61,13 @@ func WithFilter(fltr filters.Filter) func(options *Options) {
6261
}
6362
}
6463

64+
// WithTransformer returns a function to set the extractor [Subscriber].
65+
func WithTransformer(extractor []byte) func(options *Options) {
66+
return func(s *Options) {
67+
s.Extractor = extractor
68+
}
69+
}
70+
6571
func (o *Options) String() string {
6672
return fmt.Sprintf("options{SubscriberGroup=%v, filter=%v}", o.SubscriberGroup, o.Filter)
6773
}

coherence/topics.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"errors"
1212
"fmt"
13+
"github.com/oracle/coherence-go-client/v2/coherence/extractors"
1314
"github.com/oracle/coherence-go-client/v2/coherence/publisher"
1415
"github.com/oracle/coherence-go-client/v2/coherence/subscriber"
1516
"github.com/oracle/coherence-go-client/v2/coherence/topic"
@@ -37,6 +38,7 @@ const (
3738
var (
3839
_ Publisher[string] = &topicPublisher[string]{}
3940
_ NamedTopic[string] = &baseTopicsClient[string]{}
41+
_ Subscriber[string] = &topicSubscriber[string]{}
4042

4143
ErrTopicDestroyedOrReleased = errors.New("this topic has been destroyed or released")
4244
ErrTopicsNoSupported = errors.New("the coherence server version must support protocol version 1 or above to use topic")
@@ -55,23 +57,37 @@ type NamedTopic[V any] interface {
5557
// Destroy destroys this topic on the server and releases all resources. After this operation it is no longer usable on the client or server..
5658
Destroy(ctx context.Context) error
5759

60+
// CreatePublisher creates a Publisher with the specified options.
5861
CreatePublisher(ctx context.Context, options ...func(o *publisher.Options)) (Publisher[V], error)
62+
63+
// CreateSubscriber creates a Subscriber with the specified options.
64+
// Note: If you wish to create a Subscriber with a transformer, you should use the helper
65+
// function CreatSubscriberWithTransformer.
5966
CreateSubscriber(ctx context.Context, options ...func(o *subscriber.Options)) (Subscriber[V], error)
6067
}
6168

69+
// Publisher provides the means to publish messages to a [NamedTopic].
6270
type Publisher[V any] interface {
6371
GetProxyID() int32
6472
GetPublisherID() int64
6573
GetChannelCount() int32
74+
75+
// Publish publishes a message and returns a status.
6676
Publish(ctx context.Context, value V) (*publisher.PublishStatus, error)
77+
78+
// Close closes a publisher and releases all resources associated with it in the client
79+
// and on the server.
6780
Close(ctx context.Context) error
6881
}
6982

83+
// Subscriber subscribes directly to a [NamedTopic], or to a subscriber group of a [NamedTopic].
7084
type Subscriber[V any] interface {
85+
// Close closes a subscriber and releases all resources associated with it in the client
86+
// and on the server.
7187
Close(ctx context.Context) error
7288
}
7389

74-
// GetNamedTopic gets a [NamedTopic] of the generic type specified or if a cache already exists with the
90+
// GetNamedTopic gets a [NamedTopic] of the generic type specified or if a topic already exists with the
7591
// same type parameters, it will return it otherwise it will create a new one.
7692
func GetNamedTopic[V any](ctx context.Context, session *Session, topicName string, options ...func(cache *topic.Options)) (NamedTopic[V], error) {
7793
var (
@@ -342,7 +358,8 @@ func ensurePublisherOptions(options ...func(cache *publisher.Options)) *publishe
342358
return publisherOptions
343359
}
344360

345-
// CreatePublisher creates a topic publisher.
361+
// CreatePublisher creates a topic publisher when provided a topicName. You do not have to had created
362+
// a topic, but it is equivalent to called CreatePublisher on a [NamedTopic].
346363
func CreatePublisher[V any](ctx context.Context, session *Session, topicName string, options ...func(cache *publisher.Options)) (Publisher[V], error) {
347364
publisherOptions := ensurePublisherOptions(options...)
348365

@@ -360,6 +377,21 @@ func CreatePublisher[V any](ctx context.Context, session *Session, topicName str
360377
return newPublisher[V](session, nil, result, topicName, publisherOptions)
361378
}
362379

380+
// CreatSubscriberWithTransformer creates a subscriber which will transform the value from the topic using
381+
// the supplied extractor.
382+
func CreatSubscriberWithTransformer[E any](ctx context.Context, session *Session, topicName string,
383+
extractor extractors.ValueExtractor[any, E], options ...func(cache *subscriber.Options)) (Subscriber[E], error) {
384+
385+
binExtractor, err := session.genericSerializer.Serialize(extractor)
386+
if err != nil {
387+
return nil, err
388+
}
389+
390+
options = append(options, subscriber.WithTransformer(binExtractor))
391+
392+
return CreateSubscriber[E](ctx, session, topicName, options...)
393+
}
394+
363395
// CreateSubscriber creates a topic subscriber.
364396
func CreateSubscriber[V any](ctx context.Context, session *Session, topicName string, options ...func(cache *subscriber.Options)) (Subscriber[V], error) {
365397
var (

test/e2e/topics/topics_test.go

Lines changed: 103 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,7 @@ func TestBasicTopicAnonPubSub(t *testing.T) {
8282
g.Expect(err).ShouldNot(gomega.HaveOccurred())
8383
log.Println("Publisher created", pub1)
8484

85-
for i := 1; i <= 1_000; i++ {
86-
status, err2 := pub1.Publish(ctx, fmt.Sprintf("my-value-%d", i))
87-
g.Expect(err2).ShouldNot(gomega.HaveOccurred())
88-
g.Expect(status).ShouldNot(gomega.BeNil())
89-
}
85+
publishEntriesString(g, pub1, 1_000)
9086

9187
utils.Sleep(5)
9288

@@ -122,11 +118,7 @@ func TestCreatePubSubWithoutCreatingTopic(t *testing.T) {
122118
g.Expect(err).ShouldNot(gomega.HaveOccurred())
123119
log.Println("Publisher created", pub1)
124120

125-
for i := 1; i <= 1_000; i++ {
126-
status, err2 := pub1.Publish(ctx, fmt.Sprintf("my-value-%d", i))
127-
g.Expect(err2).ShouldNot(gomega.HaveOccurred())
128-
g.Expect(status).ShouldNot(gomega.BeNil())
129-
}
121+
publishEntriesString(g, pub1, 1_000)
130122

131123
utils.Sleep(5)
132124

@@ -137,7 +129,7 @@ func TestCreatePubSubWithoutCreatingTopic(t *testing.T) {
137129
err = pub1.Close(ctx)
138130
g.Expect(err).Should(gomega.HaveOccurred())
139131

140-
// get teh topic so we can destroy
132+
// get the topic so we can destroy
141133
topic1, err := coherence.GetNamedTopic[string](ctx, session1, topicName, topic.WithChannelCount(17))
142134
g.Expect(err).ShouldNot(gomega.HaveOccurred())
143135

@@ -169,6 +161,42 @@ func TestSubscriberWithFilter(t *testing.T) {
169161
g.Expect(err).ShouldNot(gomega.HaveOccurred())
170162
log.Println("Subscriber created", sub1)
171163

164+
runTest[string](g, topic1, sub1)
165+
}
166+
167+
func TestSubscriberWithTransformer(t *testing.T) {
168+
var (
169+
g = gomega.NewWithT(t)
170+
err error
171+
session1 *coherence.Session
172+
topic1 coherence.NamedTopic[utils.Person]
173+
ctx = context.Background()
174+
)
175+
176+
const topicName = "my-topic-anon-transformer"
177+
178+
session1, err = utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
179+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
180+
defer session1.Close()
181+
182+
// create a topic that will just return a name from the utils.Person using a transformer
183+
topic1, err = coherence.GetNamedTopic[utils.Person](ctx, session1, topicName, topic.WithChannelCount(17))
184+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
185+
log.Println(topic1)
186+
187+
extractor := extractors.Extract[string]("name")
188+
// create a subscriber with a transformer, this
189+
sub1, err := coherence.CreatSubscriberWithTransformer(ctx, session1, topicName, extractor,
190+
subscriber.WithFilter(filters.GreaterEqual(extractors.Extract[int]("age"), 10)))
191+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
192+
log.Println("Subscriber created", sub1)
193+
194+
runTest[string](g, topic1, sub1)
195+
}
196+
197+
func runTest[E any](g *gomega.WithT, topic1 coherence.NamedTopic[utils.Person], s coherence.Subscriber[E]) {
198+
ctx := context.Background()
199+
172200
pub1, err := topic1.CreatePublisher(context.Background())
173201
g.Expect(err).ShouldNot(gomega.HaveOccurred())
174202
log.Println("Publisher created", pub1)
@@ -186,6 +214,47 @@ func TestSubscriberWithFilter(t *testing.T) {
186214

187215
utils.Sleep(5)
188216

217+
err = s.Close(ctx)
218+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
219+
220+
err = s.Close(ctx)
221+
g.Expect(err).Should(gomega.HaveOccurred())
222+
}
223+
224+
func TestSubscriberWithTransformerAndFilter(t *testing.T) {
225+
var (
226+
g = gomega.NewWithT(t)
227+
err error
228+
session1 *coherence.Session
229+
topic1 coherence.NamedTopic[utils.Person]
230+
ctx = context.Background()
231+
)
232+
233+
const topicName = "my-topic-anon-transformer"
234+
235+
session1, err = utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
236+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
237+
defer session1.Close()
238+
239+
// create a topic that will just return a name from the utils.Person using a transformer
240+
topic1, err = coherence.GetNamedTopic[utils.Person](ctx, session1, topicName, topic.WithChannelCount(17))
241+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
242+
log.Println(topic1)
243+
244+
extractor := extractors.Extract[string]("name")
245+
// create a subscriber with a transformer, this
246+
sub1, err := coherence.CreatSubscriberWithTransformer(ctx, session1, topicName, extractor)
247+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
248+
log.Println("Subscriber created", sub1)
249+
250+
pub1, err := topic1.CreatePublisher(context.Background())
251+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
252+
log.Println("Publisher created", pub1)
253+
254+
publishEntriesPerson(g, pub1, 1_000)
255+
256+
utils.Sleep(5)
257+
189258
err = sub1.Close(ctx)
190259
g.Expect(err).ShouldNot(gomega.HaveOccurred())
191260

@@ -195,3 +264,26 @@ func TestSubscriberWithFilter(t *testing.T) {
195264
err = topic1.Destroy(ctx)
196265
g.Expect(err).ShouldNot(gomega.HaveOccurred())
197266
}
267+
268+
func publishEntriesPerson(g *gomega.WithT, pub coherence.Publisher[utils.Person], count int) {
269+
ctx := context.Background()
270+
for i := 1; i <= count; i++ {
271+
p := utils.Person{
272+
ID: i,
273+
Name: fmt.Sprintf("my-value-%d", i),
274+
Age: 10 + i,
275+
}
276+
status, err2 := pub.Publish(ctx, p)
277+
g.Expect(err2).ShouldNot(gomega.HaveOccurred())
278+
g.Expect(status).ShouldNot(gomega.BeNil())
279+
}
280+
}
281+
282+
func publishEntriesString(g *gomega.WithT, pub coherence.Publisher[string], count int) {
283+
ctx := context.Background()
284+
for i := 1; i <= count; i++ {
285+
status, err2 := pub.Publish(ctx, fmt.Sprintf("value-%d", i))
286+
g.Expect(err2).ShouldNot(gomega.HaveOccurred())
287+
g.Expect(status).ShouldNot(gomega.BeNil())
288+
}
289+
}

0 commit comments

Comments
 (0)