Skip to content

Commit e15e499

Browse files
authored
Merge pull request #1358 Topic callback reader initial experimental
2 parents dbd6566 + ad3d5bb commit e15e499

31 files changed

+1763
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added experimental topic listener implementation
12
* Fixed `internal/xstrings.Buffer()` leak without call `buffer.Free()`
23
* Removed double quotas from goroutine labels background workers for prevent problem with pprof
34

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package topicreaderexamples
2+
3+
import (
4+
"context"
5+
"log"
6+
"sync"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
11+
)
12+
13+
func StartReader(ctx context.Context, db *ydb.Driver) (*topiclistener.TopicListener, error) {
14+
handler := &TopicEventsHandler{
15+
locks: make(map[int64]int64),
16+
}
17+
18+
reader, err := db.Topic().StartListener("consumer", handler, topicoptions.ReadTopic("my-topic"))
19+
if err != nil {
20+
return nil, err
21+
}
22+
if err = reader.WaitInit(ctx); err != nil {
23+
return nil, err
24+
}
25+
26+
return reader, nil
27+
}
28+
29+
type TopicEventsHandler struct {
30+
topiclistener.BaseHandler
31+
listener *topiclistener.TopicListener
32+
33+
m sync.Mutex
34+
locks map[int64]int64 // [partitionSessionID]lockID
35+
}
36+
37+
func (h *TopicEventsHandler) OnReaderCreated(req *topiclistener.ReaderReady) error {
38+
h.listener = req.Listener
39+
40+
return nil
41+
}
42+
43+
func (h *TopicEventsHandler) OnReadMessages(
44+
ctx context.Context,
45+
event *topiclistener.ReadMessages,
46+
) error {
47+
for _, mess := range event.Batch.Messages {
48+
log.Printf("Receive message: %v/%v/%v", mess.Topic(), mess.PartitionID(), mess.SeqNo)
49+
}
50+
51+
processBatch(ctx, event.Batch)
52+
53+
return nil
54+
}
55+
56+
func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
57+
ctx context.Context,
58+
event *topiclistener.StartPartitionSessionRequest,
59+
) error {
60+
lockID, offset, err := lockPartition(ctx, event.PartitionSession.TopicPath, event.PartitionSession.PartitionID)
61+
62+
h.m.Lock()
63+
h.locks[event.PartitionSession.PartitionSessionID] = lockID
64+
h.m.Unlock()
65+
66+
log.Printf("Started read partition %v/%v", event.PartitionSession.TopicPath, event.PartitionSession.PartitionID)
67+
event.ConfirmWithParams(
68+
topiclistener.StartPartitionSessionConfirm{}.
69+
WithReadOffet(offset).
70+
WithCommitOffset(offset),
71+
)
72+
73+
return err
74+
}
75+
76+
func (h *TopicEventsHandler) OnStopPartitionSessionRequest(
77+
ctx context.Context,
78+
event *topiclistener.StopPartitionSessionRequest,
79+
) error {
80+
h.m.Lock()
81+
lockID := h.locks[event.PartitionSession.PartitionSessionID]
82+
delete(h.locks, event.PartitionSession.PartitionSessionID)
83+
h.m.Unlock()
84+
85+
err := unlockPartition(ctx, lockID)
86+
event.Confirm()
87+
88+
return err
89+
}
90+
91+
func lockPartition(ctx context.Context, topic string, partitionID int64) (lockID, offset int64, err error) {
92+
panic("not implemented in the example")
93+
}
94+
95+
func unlockPartition(ctx context.Context, lockID int64) error {
96+
panic("not implemented in the example")
97+
}

internal/background/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func (b *Worker) CloseReason() error {
122122
return b.closeReason
123123
}
124124

125+
func (b *Worker) StopDone() <-chan empty.Struct {
126+
return b.tasksCompleted
127+
}
128+
125129
func (b *Worker) init() {
126130
b.onceInit.Do(func() {
127131
if b.ctx == nil {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package rawtopicreader
2+
3+
// Check interface implementation
4+
var _ TopicReaderStreamInterface = StreamReader{}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package rawtopicreader
2+
3+
//go:generate mockgen -destination ../rawtopicreadermock/stream_reader_stream_interface_mock.go -package rawtopicreadermock -write_package_comment=false --typed . TopicReaderStreamInterface
4+
5+
type TopicReaderStreamInterface interface {
6+
Recv() (ServerMessage, error)
7+
Send(msg ClientMessage) error
8+
CloseSend() error
9+
}

internal/grpcwrapper/rawtopic/rawtopicreadermock/stream_reader_stream_interface_mock.go

Lines changed: 152 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicclientinternal/client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topiclistenerinternal"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal"
1416
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
1619
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
1720
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
1821
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
@@ -202,6 +205,38 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro
202205
return call(ctx)
203206
}
204207

208+
// StartListener starts read listen topic with the handler
209+
// it is fast non block call, connection starts in background
210+
func (c *Client) StartListener(
211+
consumer string,
212+
handler topiclistener.EventHandler,
213+
readSelectors topicoptions.ReadSelectors,
214+
opts ...topicoptions.ListenerOption,
215+
) (*topiclistener.TopicListener, error) {
216+
cfg := topiclistenerinternal.NewStreamListenerConfig()
217+
218+
cfg.Consumer = consumer
219+
220+
cfg.Selectors = make([]*topicreadercommon.PublicReadSelector, len(readSelectors))
221+
for i := range readSelectors {
222+
cfg.Selectors[i] = readSelectors[i].Clone()
223+
}
224+
225+
for _, opt := range opts {
226+
if opt == nil {
227+
continue
228+
}
229+
230+
opt(&cfg)
231+
}
232+
233+
if err := cfg.Validate(); err != nil {
234+
return nil, err
235+
}
236+
237+
return topiclistener.NewTopicListener(&c.rawClient, &cfg, handler)
238+
}
239+
205240
// StartReader create new topic reader and start pull messages from server
206241
// it is fast non block call, connection will start in background
207242
func (c *Client) StartReader(

0 commit comments

Comments
 (0)