Skip to content

Commit dbd6566

Browse files
authored
Merge pull request #1360 extract common topic code to dedicated package, prepare for topic listener
prepare for topic listener pr
2 parents c12b35d + a295cc4 commit dbd6566

40 files changed

+1515
-1273
lines changed

internal/topic/topicreaderinternal/batch.go renamed to internal/topic/topicreadercommon/batch.go

Lines changed: 75 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package topicreaderinternal
1+
package topicreadercommon
22

33
import (
44
"context"
@@ -20,17 +20,17 @@ type PublicBatch struct {
2020

2121
Messages []*PublicMessage
2222

23-
commitRange commitRange // от всех сообщений батча
23+
commitRange CommitRange // от всех сообщений батча
2424
}
2525

26-
func newBatch(session *partitionSession, messages []*PublicMessage) (*PublicBatch, error) {
26+
func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error) {
2727
for i := 0; i < len(messages); i++ {
2828
msg := messages[i]
2929

30-
if msg.commitRange.partitionSession == nil {
31-
msg.commitRange.partitionSession = session
30+
if msg.commitRange.PartitionSession == nil {
31+
msg.commitRange.PartitionSession = session
3232
}
33-
if session != msg.commitRange.partitionSession {
33+
if session != msg.commitRange.PartitionSession {
3434
return nil, xerrors.WithStackTrace(errBadSessionWhileMessageBatchCreate)
3535
}
3636

@@ -39,17 +39,17 @@ func newBatch(session *partitionSession, messages []*PublicMessage) (*PublicBatc
3939
}
4040

4141
prev := messages[i-1]
42-
if prev.commitRange.commitOffsetEnd != msg.commitRange.commitOffsetStart {
42+
if prev.commitRange.CommitOffsetEnd != msg.commitRange.CommitOffsetStart {
4343
return nil, xerrors.WithStackTrace(errBadMessageOffsetWhileMessageBatchCreate)
4444
}
4545
}
4646

47-
offset := commitRange{
48-
partitionSession: session,
47+
offset := CommitRange{
48+
PartitionSession: session,
4949
}
5050
if len(messages) > 0 {
51-
offset.commitOffsetStart = messages[0].commitRange.commitOffsetStart
52-
offset.commitOffsetEnd = messages[len(messages)-1].commitRange.commitOffsetEnd
51+
offset.CommitOffsetStart = messages[0].commitRange.CommitOffsetStart
52+
offset.CommitOffsetEnd = messages[len(messages)-1].commitRange.CommitOffsetEnd
5353
}
5454

5555
return &PublicBatch{
@@ -58,13 +58,13 @@ func newBatch(session *partitionSession, messages []*PublicMessage) (*PublicBatc
5858
}, nil
5959
}
6060

61-
func newBatchFromStream(
62-
decoders decoderMap,
63-
session *partitionSession,
61+
func NewBatchFromStream(
62+
decoders DecoderMap,
63+
session *PartitionSession,
6464
sb rawtopicreader.Batch, //nolint:gocritic
6565
) (*PublicBatch, error) {
6666
messages := make([]*PublicMessage, len(sb.MessageData))
67-
prevOffset := session.lastReceivedMessageOffset()
67+
prevOffset := session.LastReceivedMessageOffset()
6868
for i := range sb.MessageData {
6969
sMess := &sb.MessageData[i]
7070

@@ -82,9 +82,9 @@ func newBatchFromStream(
8282
dstMess.data = createReader(decoders, sb.Codec, sMess.Data)
8383
dstMess.UncompressedSize = int(sMess.UncompressedSize)
8484

85-
dstMess.commitRange.partitionSession = session
86-
dstMess.commitRange.commitOffsetStart = prevOffset + 1
87-
dstMess.commitRange.commitOffsetEnd = sMess.Offset + 1
85+
dstMess.commitRange.PartitionSession = session
86+
dstMess.commitRange.CommitOffsetStart = prevOffset + 1
87+
dstMess.commitRange.CommitOffsetEnd = sMess.Offset + 1
8888

8989
if len(sMess.MetadataItems) > 0 {
9090
dstMess.Metadata = make(map[string][]byte, len(sMess.MetadataItems))
@@ -96,15 +96,15 @@ func newBatchFromStream(
9696
prevOffset = sMess.Offset
9797
}
9898

99-
session.setLastReceivedMessageOffset(prevOffset)
99+
session.SetLastReceivedMessageOffset(prevOffset)
100100

101-
return newBatch(session, messages)
101+
return NewBatch(session, messages)
102102
}
103103

104104
// Context is cancelled when code should stop to process messages batch
105105
// for example - lost connection to server or receive stop partition signal without graceful flag
106106
func (m *PublicBatch) Context() context.Context {
107-
return m.commitRange.partitionSession.Context()
107+
return m.commitRange.PartitionSession.Context()
108108
}
109109

110110
// Topic is path of source topic of the messages in the batch
@@ -117,56 +117,14 @@ func (m *PublicBatch) PartitionID() int64 {
117117
return m.partitionSession().PartitionID
118118
}
119119

120-
func (m *PublicBatch) partitionSession() *partitionSession {
121-
return m.commitRange.partitionSession
120+
func (m *PublicBatch) partitionSession() *PartitionSession {
121+
return m.commitRange.PartitionSession
122122
}
123123

124124
func (m *PublicBatch) getCommitRange() PublicCommitRange {
125125
return m.commitRange.getCommitRange()
126126
}
127127

128-
func (m *PublicBatch) append(b *PublicBatch) (*PublicBatch, error) {
129-
var res *PublicBatch
130-
if m == nil {
131-
res = &PublicBatch{}
132-
} else {
133-
res = m
134-
}
135-
136-
if res.commitRange.partitionSession != b.commitRange.partitionSession {
137-
return nil, xerrors.WithStackTrace(errors.New("ydb: bad partition session for merge"))
138-
}
139-
140-
if res.commitRange.commitOffsetEnd != b.commitRange.commitOffsetStart {
141-
return nil, xerrors.WithStackTrace(errors.New("ydb: bad offset interval for merge"))
142-
}
143-
144-
res.Messages = append(res.Messages, b.Messages...)
145-
res.commitRange.commitOffsetEnd = b.commitRange.commitOffsetEnd
146-
147-
return res, nil
148-
}
149-
150-
func (m *PublicBatch) cutMessages(count int) (head, rest *PublicBatch) {
151-
switch {
152-
case count == 0:
153-
return nil, m
154-
case count >= len(m.Messages):
155-
return m, nil
156-
default:
157-
// slice[0:count:count] - limit slice capacity and prevent overwrite rest by append messages to head
158-
// explicit 0 need for prevent typos, when type slice[count:count] instead of slice[:count:count]
159-
head, _ = newBatch(m.commitRange.partitionSession, m.Messages[0:count:count])
160-
rest, _ = newBatch(m.commitRange.partitionSession, m.Messages[count:])
161-
162-
return head, rest
163-
}
164-
}
165-
166-
func (m *PublicBatch) isEmpty() bool {
167-
return m == nil || len(m.Messages) == 0
168-
}
169-
170128
func splitBytesByMessagesInBatches(batches []*PublicBatch, totalBytesCount int) error {
171129
restBytes := totalBytesCount
172130

@@ -219,3 +177,55 @@ func splitBytesByMessagesInBatches(batches []*PublicBatch, totalBytesCount int)
219177

220178
return nil
221179
}
180+
181+
func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error) {
182+
var res *PublicBatch
183+
if original == nil {
184+
res = &PublicBatch{}
185+
} else {
186+
res = original
187+
}
188+
189+
if res.commitRange.PartitionSession != appended.commitRange.PartitionSession {
190+
return nil, xerrors.WithStackTrace(errors.New("ydb: bad partition session for merge"))
191+
}
192+
193+
if res.commitRange.CommitOffsetEnd != appended.commitRange.CommitOffsetStart {
194+
return nil, xerrors.WithStackTrace(errors.New("ydb: bad offset interval for merge"))
195+
}
196+
197+
res.Messages = append(res.Messages, appended.Messages...)
198+
res.commitRange.CommitOffsetEnd = appended.commitRange.CommitOffsetEnd
199+
200+
return res, nil
201+
}
202+
203+
func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch) {
204+
switch {
205+
case count == 0:
206+
return nil, b
207+
case count >= len(b.Messages):
208+
return b, nil
209+
default:
210+
// slice[0:count:count] - limit slice capacity and prevent overwrite rest by append messages to head
211+
// explicit 0 need for prevent typos, when type slice[count:count] instead of slice[:count:count]
212+
head, _ = NewBatch(b.commitRange.PartitionSession, b.Messages[0:count:count])
213+
rest, _ = NewBatch(b.commitRange.PartitionSession, b.Messages[count:])
214+
215+
return head, rest
216+
}
217+
}
218+
219+
func BatchIsEmpty(b *PublicBatch) bool {
220+
return b == nil || len(b.Messages) == 0
221+
}
222+
223+
func BatchGetPartitionSessionID(item *PublicBatch) rawtopicreader.PartitionSessionID {
224+
return item.partitionSession().PartitionSessionID
225+
}
226+
227+
func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch {
228+
b.commitRange = commitRange
229+
230+
return b
231+
}

0 commit comments

Comments
 (0)