Skip to content

Commit aaaf690

Browse files
committed
fix linter
1 parent b8f3944 commit aaaf690

File tree

8 files changed

+73
-22
lines changed

8 files changed

+73
-22
lines changed

internal/topic/topiclistenerinternal/event_handler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ type EventHandler interface {
2323
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
2424
OnReadMessages(ctx context.Context, event *PublicReadMessages) error
2525

26-
// OnStopPartitionSessionRequest called when the server send stop partition message. It means that no more OnReadMessages
27-
// calls for the partition session.
26+
// OnStopPartitionSessionRequest called when the server send stop partition message.
27+
// It means that no more OnReadMessages calls for the partition session.
2828
// You must call event.Confirm() for allow the server to stop the partition session (if event.Graceful=true).
2929
// Confirm is optional for event.Graceful=false
3030
// The method can be called twice: with event.Graceful=true, then event.Graceful=false.
@@ -49,7 +49,6 @@ type PublicStartPartitionSessionEvent struct {
4949
PartitionSession topicreadercommon.PublicPartitionSession
5050
CommittedOffset int64
5151
PartitionOffsets PublicOffsetsRange
52-
resp PublicStartPartitionSessionConfirm
5352
respChan chan PublicStartPartitionSessionConfirm
5453
}
5554

@@ -77,6 +76,7 @@ type PublicStartPartitionSessionConfirm struct {
7776
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
7877
func (c PublicStartPartitionSessionConfirm) WithReadOffet(val int64) PublicStartPartitionSessionConfirm {
7978
c.readOffset = &val
79+
8080
return c
8181
}
8282

@@ -85,6 +85,7 @@ func (c PublicStartPartitionSessionConfirm) WithReadOffet(val int64) PublicStart
8585
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
8686
func (c PublicStartPartitionSessionConfirm) WithCommitOffset(val int64) PublicStartPartitionSessionConfirm {
8787
c.CommitOffset = &val
88+
8889
return c
8990
}
9091

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ func newStreamListener(
5353
res.initVars(sessionIDCounter)
5454
if err := res.initStream(connectionCtx, client); err != nil {
5555
res.closeWithTimeout(connectionCtx, err)
56+
5657
return nil, err
5758
}
5859

5960
res.startBackground()
6061
res.sendDataRequest(config.BufferSize)
62+
6163
return res, nil
6264
}
6365

@@ -114,6 +116,7 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
114116
}
115117
}
116118

119+
//nolint:funlen
117120
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {
118121
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
119122
l.streamClose = streamClose
@@ -150,20 +153,33 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
150153

151154
resp, err := l.stream.Recv()
152155
if err != nil {
153-
return xerrors.WithStackTrace(fmt.Errorf("ydb: failed to receive init response for read stream in the listener: %w", err))
156+
return xerrors.WithStackTrace(fmt.Errorf(
157+
"ydb: failed to receive init response for read stream in the listener: %w",
158+
err,
159+
))
154160
}
155161

156162
if status := resp.StatusData(); !status.Status.IsSuccess() {
157-
// TODO: better handler status error
158-
return xerrors.WithStackTrace(fmt.Errorf("ydb: received bad status on init the topic stream listener: %v (%v)", status.Status, status.Issues))
163+
// wrap initialization error as operation status error - for handle with retrier
164+
// https://github.com/ydb-platform/ydb-go-sdk/issues/1361
165+
return xerrors.WithStackTrace(fmt.Errorf(
166+
"ydb: received bad status on init the topic stream listener: %v (%v)",
167+
status.Status,
168+
status.Issues,
169+
))
159170
}
160171

161172
initResp, ok := resp.(*rawtopicreader.InitResponse)
162173
if !ok {
163-
return xerrors.WithStackTrace(fmt.Errorf("bad message type on session init: %v (%v)", resp, reflect.TypeOf(resp)))
174+
return xerrors.WithStackTrace(fmt.Errorf(
175+
"bad message type on session init: %v (%v)",
176+
resp,
177+
reflect.TypeOf(resp),
178+
))
164179
}
165180

166181
l.sessionID = initResp.SessionID
182+
167183
return nil
168184
}
169185

@@ -178,13 +194,17 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) {
178194
l.m.WithLock(func() {
179195
messages = l.messagesToSend
180196
if len(messages) > 0 {
181-
l.messagesToSend = make([]rawtopicreader.ClientMessage, 0, len(messages)*2)
197+
l.messagesToSend = make([]rawtopicreader.ClientMessage, 0, cap(messages))
182198
}
183199
})
184200

185201
for _, m := range messages {
186202
if err := l.stream.Send(m); err != nil {
187-
l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed send message by grpc to topic reader stream from listener: %w", err))))
203+
l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
204+
"ydb: failed send message by grpc to topic reader stream from listener: %w",
205+
err,
206+
))))
207+
188208
return
189209
}
190210
}
@@ -203,6 +223,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) {
203223
l.closeWithTimeout(ctx, xerrors.WithStackTrace(
204224
fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err),
205225
))
226+
206227
return
207228
}
208229

@@ -218,14 +239,17 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop
218239
case *rawtopicreader.StopPartitionSessionRequest:
219240
err = l.onStopPartitionRequest(ctx, m)
220241
case *rawtopicreader.ReadResponse:
221-
err = l.onReadResponse(ctx, m)
242+
err = l.onReadResponse(m)
222243
}
223244
if err != nil {
224245
l.closeWithTimeout(ctx, err)
225246
}
226247
}
227248

228-
func (l *streamListener) onStartPartitionRequest(ctx context.Context, m *rawtopicreader.StartPartitionSessionRequest) error {
249+
func (l *streamListener) onStartPartitionRequest(
250+
ctx context.Context,
251+
m *rawtopicreader.StartPartitionSessionRequest,
252+
) error {
229253
session := topicreadercommon.NewPartitionSession(
230254
ctx,
231255
m.PartitionSession.Path,
@@ -276,10 +300,14 @@ func (l *streamListener) onStartPartitionRequest(ctx context.Context, m *rawtopi
276300
}
277301

278302
l.sendMessage(resp)
303+
279304
return nil
280305
}
281306

282-
func (l *streamListener) onStopPartitionRequest(ctx context.Context, m *rawtopicreader.StopPartitionSessionRequest) error {
307+
func (l *streamListener) onStopPartitionRequest(
308+
ctx context.Context,
309+
m *rawtopicreader.StopPartitionSessionRequest,
310+
) error {
283311
session, err := l.sessions.Get(m.PartitionSessionID)
284312
if !m.Graceful && session == nil {
285313
// stop partition may be received twice: graceful and force
@@ -331,10 +359,11 @@ func (l *streamListener) onStopPartitionRequest(ctx context.Context, m *rawtopic
331359
if m.Graceful {
332360
l.sendMessage(&rawtopicreader.StopPartitionSessionResponse{PartitionSessionID: session.StreamPartitionSessionID})
333361
}
362+
334363
return nil
335364
}
336365

337-
func (l *streamListener) onReadResponse(ctx context.Context, m *rawtopicreader.ReadResponse) error {
366+
func (l *streamListener) onReadResponse(m *rawtopicreader.ReadResponse) error {
338367
batches, err := topicreadercommon.ReadRawBatchesToPublicBatches(m, l.sessions, l.cfg.Decoders)
339368
if err != nil {
340369
return err
@@ -349,6 +378,7 @@ func (l *streamListener) onReadResponse(ctx context.Context, m *rawtopicreader.R
349378
}
350379
}
351380
l.sendDataRequest(m.BytesSize)
381+
352382
return nil
353383
}
354384

internal/topic/topiclistenerinternal/stream_listener_fixtures_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ func StreamListener(e fixenv.Env) *streamListener {
2020
}
2121
l.handler = EventHandlerMock(e)
2222
l.sessions = PartitionStorage(e)
23+
2324
return fixenv.NewGenericResult(l), nil
2425
}
26+
2527
return fixenv.CacheResult(e, f)
2628
}
2729

@@ -31,8 +33,10 @@ func PartitionStorage(e fixenv.Env) *topicreadercommon.PartitionSessionStorage {
3133
if err := storage.Add(PartitionSession(e)); err != nil {
3234
return nil, err
3335
}
36+
3437
return fixenv.NewGenericResult(storage), nil
3538
}
39+
3640
return fixenv.CacheResult(e, f)
3741
}
3842

@@ -49,29 +53,36 @@ func PartitionSession(e fixenv.Env) *topicreadercommon.PartitionSession {
4953
0,
5054
)), nil
5155
}
56+
5257
return fixenv.CacheResult(e, f)
5358
}
5459

5560
func MockController(e fixenv.Env) *gomock.Controller {
5661
f := func() (*fixenv.GenericResult[*gomock.Controller], error) {
5762
mc := gomock.NewController(e.T().(gomock.TestReporter))
63+
5864
return fixenv.NewGenericResult(mc), nil
5965
}
66+
6067
return fixenv.CacheResult(e, f)
6168
}
6269

6370
func StreamMock(e fixenv.Env) *rawtopicreadermock.MockTopicReaderStreamInterface {
6471
f := func() (*fixenv.GenericResult[*rawtopicreadermock.MockTopicReaderStreamInterface], error) {
6572
m := rawtopicreadermock.NewMockTopicReaderStreamInterface(MockController(e))
73+
6674
return fixenv.NewGenericResult(m), nil
6775
}
76+
6877
return fixenv.CacheResult(e, f)
6978
}
7079

7180
func EventHandlerMock(e fixenv.Env) *MockEventHandler {
7281
f := func() (*fixenv.GenericResult[*MockEventHandler], error) {
7382
m := NewMockEventHandler(MockController(e))
83+
7484
return fixenv.NewGenericResult(m), nil
7585
}
86+
7687
return fixenv.CacheResult(e, f)
7788
}

internal/topic/topiclistenerinternal/stream_listener_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
3434
EventHandlerMock(e).EXPECT().OnReadMessages(PartitionSession(e).Context(), gomock.Any()).
3535
DoAndReturn(func(ctx context.Context, event *PublicReadMessages) error {
3636
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
37-
require.Equal(t, event.Batch.Messages[0].SeqNo, seqNo)
37+
require.Equal(t, seqNo, event.Batch.Messages[0].SeqNo)
38+
3839
return nil
3940
})
4041

@@ -92,6 +93,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
9293
WithReadOffet(respReadOffset).
9394
WithCommitOffset(respCommitOffset),
9495
)
96+
9597
return nil
9698
})
9799

@@ -142,6 +144,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
142144
require.True(t, event.Graceful)
143145
require.Equal(t, int64(5), event.CommittedOffset)
144146
event.Confirm()
147+
145148
return nil
146149
})
147150

@@ -175,6 +178,7 @@ func TestStreamListener_CloseSessionsOnCloseListener(t *testing.T) {
175178
require.False(t, event.Graceful)
176179
require.Equal(t, PartitionSession(e).CommittedOffset().ToInt64(), event.CommittedOffset)
177180
event.Confirm()
181+
178182
return nil
179183
})
180184
require.NoError(t, StreamListener(e).Close(sf.Context(e), errors.New("test")))

internal/topic/topiclistenerinternal/topic_listener_reconnector.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,13 @@ func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) err
5757
}
5858

5959
func (lr *TopicListenerReconnector) connect(connectionCtx context.Context) {
60-
lr.streamListener, lr.connectionResult = newStreamListener(connectionCtx, lr.client, lr.handler, lr.streamConfig, &lr.connectionIDCounter)
60+
lr.streamListener, lr.connectionResult = newStreamListener(
61+
connectionCtx,
62+
lr.client,
63+
lr.handler,
64+
lr.streamConfig,
65+
&lr.connectionIDCounter,
66+
)
6167
close(lr.connectionCompleted)
6268
}
6369

@@ -85,6 +91,7 @@ func (lr *TopicListenerReconnector) WaitStop(ctx context.Context) error {
8591
if errors.Is(err, ErrUserCloseTopic) {
8692
return nil
8793
}
94+
8895
return err
8996
}
9097
}

internal/topic/topicreaderinternal/callback_reader.go

Lines changed: 0 additions & 3 deletions
This file was deleted.

topic/topiclistener/topic_listener.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ type TopicListener struct {
1212
listenerReconnector *topiclistenerinternal.TopicListenerReconnector
1313
}
1414

15-
func NewTopicListener(client *rawtopic.Client, config *topiclistenerinternal.StreamListenerConfig, handler EventHandler) (*TopicListener, error) {
15+
func NewTopicListener(
16+
client *rawtopic.Client, //nolint:interfacer
17+
config *topiclistenerinternal.StreamListenerConfig,
18+
handler EventHandler,
19+
) (*TopicListener, error) {
1620
reconnector, err := topiclistenerinternal.NewTopicListenerReconnector(client, config, handler)
1721
if err != nil {
1822
return nil, err

topic/topicoptions/topicoptions_reader.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ func ReadTopic(path string) ReadSelectors {
2525
// ReaderOption options for topic reader
2626
type ReaderOption = topicreaderinternal.PublicReaderOption
2727

28-
// CallbackReaderOption options for topic callback reader
29-
type CallbackReaderOption = topicreaderinternal.PublicCallbackReaderOption
30-
3128
// WithReaderOperationTimeout
3229
//
3330
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

0 commit comments

Comments
 (0)