Skip to content

Commit 7d63aa2

Browse files
author
Tim Middleton
committed
Minor changes to topics
1 parent 630eab5 commit 7d63aa2

File tree

4 files changed

+36
-52
lines changed

4 files changed

+36
-52
lines changed

coherence/publisher/publisher.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
var (
1717
_ OrderingOption = &OrderByDefault{}
1818
_ OrderingOption = &OrderByRoundRobin{}
19+
//_ OrderingOption = &OrderByValue{}
1920
)
2021

2122
// PublishStatus provides the result of a publish operation.
@@ -108,12 +109,3 @@ func (o *OrderByRoundRobin) GetPublishHash() int32 {
108109
// #nosec G115 -- val is guaranteed to be in int32 range
109110
return int32(newVal)
110111
}
111-
112-
//// OrderByValue defines default ordering.
113-
//type OrderByValue[V any] struct {
114-
// value V
115-
//}
116-
//
117-
//func (r *OrderByValue[V]) GetPublishHash() int32 {
118-
//
119-
//}

coherence/topics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type NamedTopic[V any] interface {
7070
CreateSubscriberGroup(ctx context.Context, subscriberGroup string, options ...func(o *subscribergroup.Options)) error
7171

7272
// DestroySubscriberGroup destroys a subscriber group.
73+
// TODO: Not viable to implement?
7374
DestroySubscriberGroup(ctx context.Context, subscriberGroup string) error
7475
}
7576

test/e2e/topics/subscriber_test.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@ import (
1414
"github.com/oracle/coherence-go-client/v2/coherence/filters"
1515
"github.com/oracle/coherence-go-client/v2/coherence/subscriber"
1616
"github.com/oracle/coherence-go-client/v2/coherence/subscribergroup"
17-
"github.com/oracle/coherence-go-client/v2/coherence/topic"
1817
"github.com/oracle/coherence-go-client/v2/test/utils"
1918
"log"
2019
"testing"
21-
"time"
2220
)
2321

2422
func TestSubscriberWithFilter(t *testing.T) {
@@ -30,7 +28,7 @@ func TestSubscriberWithFilter(t *testing.T) {
3028

3129
const topicName = "my-topic-anon-filter"
3230

33-
session1, topic1 := getSessionAndTopic(g, topicName)
31+
session1, topic1 := getSessionAndTopic[utils.Person](g, topicName)
3432
defer session1.Close()
3533

3634
// create a subscriber with a filter
@@ -50,7 +48,7 @@ func TestSubscriberWithTransformer(t *testing.T) {
5048

5149
const topicName = "my-topic-anon-transformer"
5250

53-
session1, topic1 := getSessionAndTopic(g, topicName)
51+
session1, topic1 := getSessionAndTopic[utils.Person](g, topicName)
5452
defer session1.Close()
5553

5654
extractor := extractors.Extract[string]("name")
@@ -72,7 +70,7 @@ func TestSubscriberWithTransformerAndFilter(t *testing.T) {
7270

7371
const topicName = "my-topic-anon-transformer"
7472

75-
session1, topic1 := getSessionAndTopic(g, topicName)
73+
session1, topic1 := getSessionAndTopic[utils.Person](g, topicName)
7674
defer session1.Close()
7775

7876
extractor := extractors.Extract[string]("name")
@@ -111,7 +109,7 @@ func TestSubscriberGroupWithinTopic(t *testing.T) {
111109
subGroup = "sub-group-1"
112110
)
113111

114-
session1, topic1 := getSessionAndTopic(g, topicName)
112+
session1, topic1 := getSessionAndTopic[utils.Person](g, topicName)
115113
defer session1.Close()
116114

117115
err = topic1.CreateSubscriberGroup(ctx, subGroup)
@@ -147,7 +145,7 @@ func runTestSubscriberGroup(g *gomega.WithT, options ...func(o *subscribergroup.
147145
subGroup = "sub-group-1"
148146
)
149147

150-
session1, topic1 := getSessionAndTopic(g, topicName)
148+
session1, topic1 := getSessionAndTopic[utils.Person](g, topicName)
151149
defer session1.Close()
152150

153151
err = topic1.CreateSubscriberGroup(ctx, subGroup, options...)
@@ -163,14 +161,3 @@ func runTestSubscriberGroup(g *gomega.WithT, options ...func(o *subscribergroup.
163161

164162
g.Expect(topic1.Destroy(ctx)).ShouldNot(gomega.HaveOccurred())
165163
}
166-
167-
func getSessionAndTopic(g *gomega.WithT, topicName string) (*coherence.Session, coherence.NamedTopic[utils.Person]) {
168-
session1, err := utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
169-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
170-
171-
topic1, err := coherence.GetNamedTopic[utils.Person](context.Background(), session1, topicName, topic.WithChannelCount(17))
172-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
173-
log.Println(topic1)
174-
175-
return session1, topic1
176-
}

test/e2e/topics/topics_test.go

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"github.com/onsi/gomega"
1313
"github.com/oracle/coherence-go-client/v2/coherence"
14+
"github.com/oracle/coherence-go-client/v2/coherence/publisher"
1415
"github.com/oracle/coherence-go-client/v2/coherence/topic"
1516
"github.com/oracle/coherence-go-client/v2/test/utils"
1617
"log"
@@ -20,26 +21,18 @@ import (
2021

2122
func TestBasicTopicCreatedAndDestroy(t *testing.T) {
2223
var (
23-
g = gomega.NewWithT(t)
24-
err error
25-
session1 *coherence.Session
26-
topic1 coherence.NamedTopic[string]
27-
ctx = context.Background()
24+
g = gomega.NewWithT(t)
25+
err error
26+
ctx = context.Background()
2827
)
2928

3029
const topicName = "my-topic"
3130

3231
t.Setenv("COHERENCE_LOG_LEVEL", "ALL")
3332

34-
session1, err = utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
35-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
33+
session1, topic1 := getSessionAndTopic[string](g, topicName)
3634
defer session1.Close()
3735

38-
// get a NamedQueue with name "my-queue"
39-
topic1, err = coherence.GetNamedTopic[string](ctx, session1, topicName, topic.WithChannelCount(17))
40-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
41-
log.Println(topic1)
42-
4336
utils.Sleep(5)
4437

4538
err = topic1.Destroy(ctx)
@@ -50,26 +43,26 @@ func TestBasicTopicCreatedAndDestroy(t *testing.T) {
5043
g.Expect(err).Should(gomega.HaveOccurred())
5144
}
5245

53-
func TestBasicTopicAnonPubSub(t *testing.T) {
46+
func TestTopicPublish(t *testing.T) {
47+
g := gomega.NewWithT(t)
48+
49+
RunTestBasicTopicAnonPubSub(g)
50+
RunTestBasicTopicAnonPubSub(g, publisher.WithDefaultOrdering())
51+
RunTestBasicTopicAnonPubSub(g, publisher.WithRoundRobinOrdering())
52+
RunTestBasicTopicAnonPubSub(g, publisher.WithChannelCount(21))
53+
}
54+
55+
func RunTestBasicTopicAnonPubSub(g *gomega.WithT, options ...func(cache *publisher.Options)) {
5456
var (
55-
g = gomega.NewWithT(t)
56-
err error
57-
session1 *coherence.Session
58-
topic1 coherence.NamedTopic[string]
59-
ctx = context.Background()
57+
err error
58+
ctx = context.Background()
6059
)
6160

6261
const topicName = "my-topic-anon"
6362

64-
session1, err = utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
65-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
63+
session1, topic1 := getSessionAndTopic[string](g, topicName)
6664
defer session1.Close()
6765

68-
// get a NamedQueue with name "my-queue"
69-
topic1, err = coherence.GetNamedTopic[string](ctx, session1, topicName, topic.WithChannelCount(17))
70-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
71-
log.Println(topic1)
72-
7366
// create a subscriber first
7467
sub1, err := topic1.CreateSubscriber(ctx)
7568
g.Expect(err).ShouldNot(gomega.HaveOccurred())
@@ -183,3 +176,14 @@ func publishEntriesString(g *gomega.WithT, pub coherence.Publisher[string], coun
183176
g.Expect(status).ShouldNot(gomega.BeNil())
184177
}
185178
}
179+
180+
func getSessionAndTopic[V any](g *gomega.WithT, topicName string) (*coherence.Session, coherence.NamedTopic[V]) {
181+
session1, err := utils.GetSession(coherence.WithRequestTimeout(300 * time.Second))
182+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
183+
184+
topic1, err := coherence.GetNamedTopic[V](context.Background(), session1, topicName, topic.WithChannelCount(17))
185+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
186+
log.Println(topic1)
187+
188+
return session1, topic1
189+
}

0 commit comments

Comments
 (0)