From 99d0d394f2b55e84747a9a99aea56c2e8af5066a Mon Sep 17 00:00:00 2001 From: DLC Date: Sun, 16 Feb 2025 23:05:47 +0300 Subject: [PATCH 01/17] feat: added sync.Pool for reuse decoders in topic --- internal/topic/topicreadercommon/decoders.go | 56 +++++++++++++++---- .../topic/topicreadercommon/decoders_test.go | 51 +++++++++++++++++ 2 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 internal/topic/topicreadercommon/decoders_test.go diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index a0625aed5..8c2394ebb 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -5,35 +5,69 @@ import ( "errors" "fmt" "io" + "sync" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) +type decoderPool struct { + pool sync.Pool +} + +func (p *decoderPool) Get() io.Reader { + dec, _ := p.pool.Get().(io.Reader) + return dec +} + +func (p *decoderPool) Put(dec io.Reader) { + p.pool.Put(dec) +} + +func newDecoderPool() *decoderPool { + return &decoderPool{ + pool: sync.Pool{}, + } +} + type DecoderMap struct { - m map[rawtopiccommon.Codec]PublicCreateDecoderFunc + m map[rawtopiccommon.Codec]PublicCreateDecoderFunc + dp map[rawtopiccommon.Codec]*decoderPool } func NewDecoderMap() DecoderMap { - return DecoderMap{ - m: map[rawtopiccommon.Codec]PublicCreateDecoderFunc{ - rawtopiccommon.CodecRaw: func(input io.Reader) (io.Reader, error) { - return input, nil - }, - rawtopiccommon.CodecGzip: func(input io.Reader) (io.Reader, error) { - return gzip.NewReader(input) - }, - }, + dm := DecoderMap{ + m: make(map[rawtopiccommon.Codec]PublicCreateDecoderFunc), + dp: make(map[rawtopiccommon.Codec]*decoderPool), } + + dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { + return input, nil + }) + + dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { + return gzip.NewReader(input) + }) + + return dm } func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc) { m.m[codec] = createFunc + m.dp[codec] = newDecoderPool() } func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error) { if f := m.m[codec]; f != nil { - return f(input) + decoder := m.dp[codec].Get() + if decoder == nil { + var err error + decoder, err = f(input) + if err != nil { + return nil, err + } + } + return decoder, nil } return nil, xerrors.WithStackTrace(xerrors.Wrap( diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go new file mode 100644 index 000000000..04b97d9b1 --- /dev/null +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -0,0 +1,51 @@ +package topicreadercommon + +import ( + "bytes" + "compress/gzip" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" +) + +func TestDecoderMap(t *testing.T) { + decoderMap := NewDecoderMap() + + t.Run("DecodeRaw", func(t *testing.T) { + data := []byte("test data") + reader := bytes.NewReader(data) + + decodedReader, err := decoderMap.Decode(rawtopiccommon.CodecRaw, reader) + require.NoError(t, err) + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, data, result) + }) + + t.Run("DecodeGzip", func(t *testing.T) { + data := []byte("test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := decoderMap.Decode(rawtopiccommon.CodecGzip, &buf) + require.NoError(t, err) + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, data, result) + }) + + t.Run("DecodeUnknownCodec", func(t *testing.T) { + _, err := decoderMap.Decode(rawtopiccommon.Codec(999), bytes.NewReader([]byte{})) + require.Error(t, err) + require.True(t, errors.Is(err, ErrPublicUnexpectedCodec)) + }) +} From 2e85a8ec46d99015e66059ad9dbcf5e91f023d80 Mon Sep 17 00:00:00 2001 From: DLC Date: Sun, 16 Feb 2025 23:16:48 +0300 Subject: [PATCH 02/17] fix: nlreturn linter --- internal/topic/topicreadercommon/decoders.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 8c2394ebb..5ef0c17f6 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -17,6 +17,7 @@ type decoderPool struct { func (p *decoderPool) Get() io.Reader { dec, _ := p.pool.Get().(io.Reader) + return dec } @@ -25,6 +26,7 @@ func (p *decoderPool) Put(dec io.Reader) { } func newDecoderPool() *decoderPool { + return &decoderPool{ pool: sync.Pool{}, } From 462ed6adb138f70003df6cbc503eb1e2d9796bf1 Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 17 Feb 2025 01:37:31 +0300 Subject: [PATCH 03/17] fix linters --- internal/topic/topicreadercommon/decoders.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 5ef0c17f6..f8186ea70 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -17,7 +17,7 @@ type decoderPool struct { func (p *decoderPool) Get() io.Reader { dec, _ := p.pool.Get().(io.Reader) - + return dec } @@ -26,7 +26,6 @@ func (p *decoderPool) Put(dec io.Reader) { } func newDecoderPool() *decoderPool { - return &decoderPool{ pool: sync.Pool{}, } @@ -44,10 +43,12 @@ func NewDecoderMap() DecoderMap { } dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { + return input, nil }) dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { + return gzip.NewReader(input) }) @@ -66,9 +67,11 @@ func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Rea var err error decoder, err = f(input) if err != nil { + return nil, err } } + return decoder, nil } From 52e03fa3b9488b46f2c8eb103cb87248584cc801 Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 17 Feb 2025 01:44:46 +0300 Subject: [PATCH 04/17] fix linter --- internal/topic/topicreadercommon/decoders.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index f8186ea70..013d962f9 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -43,12 +43,10 @@ func NewDecoderMap() DecoderMap { } dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { - return input, nil }) dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { - return gzip.NewReader(input) }) @@ -67,7 +65,6 @@ func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Rea var err error decoder, err = f(input) if err != nil { - return nil, err } } From 1ead17ebc36b068839358881581943057679ecf3 Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 17 Feb 2025 01:54:10 +0300 Subject: [PATCH 05/17] docs: updated CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6aaf9a74f..a89cdfa40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Supported pool of decoders * Added virtualtimestamps field to cdc description ## v3.99.10 From 7f0a56759c053bd283ba01eb640f492a0d4b34fa Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 10 Mar 2025 18:51:27 +0300 Subject: [PATCH 06/17] feat: reuse decoder pool properly --- CHANGELOG.md | 25 ++++++ internal/topic/topicreadercommon/decoders.go | 90 ++++++++++++++------ 2 files changed, 91 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a89cdfa40..3bffcf503 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,29 @@ * Supported pool of decoders + +* ## v3.101.0 +* Added `table.Client.ReadRows` method with internal retries + +## v3.100.3 +* Fixed bug with concurrent rewrites source slice of `grpc.DialOption` on dial step + +## v3.100.2 +* Fixed bug in `internal/xcontext.WithDone` (not listening chan done) + +## v3.100.1 +* Refactored behaviour on `retry.Retryable` error for retry object (such as session, connection or transaction) + +## v3.100.0 +* Added `table.DescribeTable.StoreType` to table description result from `table.Session.DescribeTable` request + +## v3.99.13 +* Added checking errors for conditionally delete item from pool + +## v3.99.12 +* Internal debug improved + +## v3.99.11 +* Added stacktrace record to row scan errors for detect broken client code +* Fixed DescribeConsumer ignoring PartitionConsumerStats * Added virtualtimestamps field to cdc description ## v3.99.10 diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 013d962f9..31e51a591 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -11,17 +11,22 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) +type ReadResetter interface { + io.Reader + Reset(r io.Reader) error +} + type decoderPool struct { pool sync.Pool } -func (p *decoderPool) Get() io.Reader { - dec, _ := p.pool.Get().(io.Reader) +func (p *decoderPool) Get() ReadResetter { + dec, _ := p.pool.Get().(ReadResetter) return dec } -func (p *decoderPool) Put(dec io.Reader) { +func (p *decoderPool) Put(dec ReadResetter) { p.pool.Put(dec) } @@ -32,52 +37,89 @@ func newDecoderPool() *decoderPool { } type DecoderMap struct { - m map[rawtopiccommon.Codec]PublicCreateDecoderFunc + m map[rawtopiccommon.Codec]func(io.Reader) (ReadResetter, error) dp map[rawtopiccommon.Codec]*decoderPool } func NewDecoderMap() DecoderMap { dm := DecoderMap{ - m: make(map[rawtopiccommon.Codec]PublicCreateDecoderFunc), + m: make(map[rawtopiccommon.Codec]func(io.Reader) (ReadResetter, error)), dp: make(map[rawtopiccommon.Codec]*decoderPool), } - dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { - return input, nil + dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (ReadResetter, error) { + return &nopResetter{Reader: input}, nil }) - dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { - return gzip.NewReader(input) + dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (ReadResetter, error) { + gz, err := gzip.NewReader(input) + if err != nil { + return nil, err + } + return gz, nil }) return dm } -func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc) { +func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (ReadResetter, error)) { m.m[codec] = createFunc m.dp[codec] = newDecoderPool() } +type pooledDecoder struct { + ReadResetter + pool *decoderPool +} + +func (p *pooledDecoder) Close() error { + if closer, ok := p.ReadResetter.(io.Closer); ok { + closer.Close() + } + p.pool.Put(p.ReadResetter) + + return nil +} + func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error) { - if f := m.m[codec]; f != nil { - decoder := m.dp[codec].Get() - if decoder == nil { - var err error - decoder, err = f(input) - if err != nil { - return nil, err - } - } + createFunc, ok := m.m[codec] + if !ok { + return nil, xerrors.WithStackTrace(xerrors.Wrap( + fmt.Errorf("ydb: failed decompress message with codec %v: %w", codec, ErrPublicUnexpectedCodec), + )) + } - return decoder, nil + pool := m.dp[codec] + decoder := pool.Get() + if decoder == nil { + var err error + decoder, err = createFunc(input) + if err != nil { + return nil, err + } + } else { + if err := decoder.Reset(input); err != nil { + return nil, err + } } - return nil, xerrors.WithStackTrace(xerrors.Wrap( - fmt.Errorf("ydb: failed decompress message with codec %v: %w", codec, ErrPublicUnexpectedCodec), - )) + return &pooledDecoder{ + ReadResetter: decoder, + pool: pool, + }, nil +} + +type nopResetter struct { + io.Reader +} + +func (n *nopResetter) Reset(r io.Reader) error { + n.Reader = r + + return nil } -type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error) +type PublicCreateDecoderFunc func(input io.Reader) (ReadResetter, error) // ErrPublicUnexpectedCodec return when try to read message content with unknown codec var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec")) From aead80478419a81a1ec5c4c806e19a07e3a43861 Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 10 Mar 2025 18:57:31 +0300 Subject: [PATCH 07/17] docs: fix CHANGRLO.md conflicts --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bffcf503..4b7baf30d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ * Supported pool of decoders -* ## v3.101.0 +## v3.101.0 * Added `table.Client.ReadRows` method with internal retries ## v3.100.3 From 1c356ada846a0d4607dd954913a1ce339c1ab4d1 Mon Sep 17 00:00:00 2001 From: DLC Date: Mon, 10 Mar 2025 19:40:30 +0300 Subject: [PATCH 08/17] fix: linter nlreturn --- internal/topic/topicreadercommon/decoders.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 31e51a591..5315f4248 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -56,6 +56,7 @@ func NewDecoderMap() DecoderMap { if err != nil { return nil, err } + return gz, nil }) From 82cb4208e35f0cfad6a6aa3ce2ac6dc1b4232aca Mon Sep 17 00:00:00 2001 From: DLC Date: Tue, 11 Mar 2025 18:47:08 +0300 Subject: [PATCH 09/17] refactor(decoder): enforce pool usage in Decode for all codecs --- internal/topic/topicreadercommon/decoders.go | 43 +++++++++++-------- .../topic/topicreadercommon/decoders_test.go | 39 +++++++++++++++++ 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 5315f4248..14e6c0c82 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -37,21 +37,21 @@ func newDecoderPool() *decoderPool { } type DecoderMap struct { - m map[rawtopiccommon.Codec]func(io.Reader) (ReadResetter, error) + m map[rawtopiccommon.Codec]PublicCreateDecoderFunc dp map[rawtopiccommon.Codec]*decoderPool } func NewDecoderMap() DecoderMap { dm := DecoderMap{ - m: make(map[rawtopiccommon.Codec]func(io.Reader) (ReadResetter, error)), + m: make(map[rawtopiccommon.Codec]PublicCreateDecoderFunc), dp: make(map[rawtopiccommon.Codec]*decoderPool), } - dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (ReadResetter, error) { + dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { return &nopResetter{Reader: input}, nil }) - dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (ReadResetter, error) { + dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { gz, err := gzip.NewReader(input) if err != nil { return nil, err @@ -63,7 +63,7 @@ func NewDecoderMap() DecoderMap { return dm } -func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (ReadResetter, error)) { +func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (io.Reader, error)) { m.m[codec] = createFunc m.dp[codec] = newDecoderPool() } @@ -92,22 +92,27 @@ func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Rea pool := m.dp[codec] decoder := pool.Get() - if decoder == nil { - var err error - decoder, err = createFunc(input) - if err != nil { - return nil, err - } - } else { - if err := decoder.Reset(input); err != nil { - return nil, err + if decoder != nil { + if resetter, ok := decoder.(ReadResetter); ok { + if err := resetter.Reset(input); err != nil { + return nil, err + } + + return &pooledDecoder{ReadResetter: resetter, pool: pool}, nil } + + return decoder, nil + } + + newDecoder, err := createFunc(input) + if err != nil { + return nil, err + } + if resetter, ok := newDecoder.(ReadResetter); ok { + return &pooledDecoder{ReadResetter: resetter, pool: pool}, nil } - return &pooledDecoder{ - ReadResetter: decoder, - pool: pool, - }, nil + return newDecoder, nil } type nopResetter struct { @@ -120,7 +125,7 @@ func (n *nopResetter) Reset(r io.Reader) error { return nil } -type PublicCreateDecoderFunc func(input io.Reader) (ReadResetter, error) +type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error) // ErrPublicUnexpectedCodec return when try to read message content with unknown codec var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec")) diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go index 04b97d9b1..0c78c29d2 100644 --- a/internal/topic/topicreadercommon/decoders_test.go +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -48,4 +48,43 @@ func TestDecoderMap(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, ErrPublicUnexpectedCodec)) }) + + t.Run("DecodeCustomCodec", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1001) + dm.AddDecoder(customCodec, func(input io.Reader) (io.Reader, error) { + return gzip.NewReader(input) + }) + require.Len(t, dm.dp, 3) + + data := []byte("custom test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := dm.Decode(customCodec, &buf) + require.NoError(t, err) + defer decodedReader.(io.Closer).Close() + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, string(data), string(result)) + + data2 := []byte("second test data") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + decodedReader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + defer decodedReader2.(io.Closer).Close() + + result2, err := io.ReadAll(decodedReader2) + require.NoError(t, err) + require.Equal(t, string(data2), string(result2)) + }) } From fd95285674e86bf9b8b1a13bdc481b61eaaa9ee8 Mon Sep 17 00:00:00 2001 From: DLC Date: Tue, 11 Mar 2025 19:01:28 +0300 Subject: [PATCH 10/17] refactor(decoder): extend pool to custom codecs --- internal/topic/topicreadercommon/decoders.go | 45 +++++++++---------- .../topic/topicreadercommon/decoders_test.go | 2 +- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 14e6c0c82..ae02e21e6 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -47,11 +47,11 @@ func NewDecoderMap() DecoderMap { dp: make(map[rawtopiccommon.Codec]*decoderPool), } - dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (io.Reader, error) { + dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (ReadResetter, error) { return &nopResetter{Reader: input}, nil }) - dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (io.Reader, error) { + dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (ReadResetter, error) { gz, err := gzip.NewReader(input) if err != nil { return nil, err @@ -63,11 +63,6 @@ func NewDecoderMap() DecoderMap { return dm } -func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (io.Reader, error)) { - m.m[codec] = createFunc - m.dp[codec] = newDecoderPool() -} - type pooledDecoder struct { ReadResetter pool *decoderPool @@ -92,27 +87,27 @@ func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Rea pool := m.dp[codec] decoder := pool.Get() - if decoder != nil { - if resetter, ok := decoder.(ReadResetter); ok { - if err := resetter.Reset(input); err != nil { - return nil, err - } - - return &pooledDecoder{ReadResetter: resetter, pool: pool}, nil + if decoder == nil { + var err error + decoder, err = createFunc(input) + if err != nil { + return nil, err + } + } else { + if err := decoder.Reset(input); err != nil { + return nil, err } - - return decoder, nil } - newDecoder, err := createFunc(input) - if err != nil { - return nil, err - } - if resetter, ok := newDecoder.(ReadResetter); ok { - return &pooledDecoder{ReadResetter: resetter, pool: pool}, nil - } + return &pooledDecoder{ + ReadResetter: decoder, + pool: pool, + }, nil +} - return newDecoder, nil +func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (ReadResetter, error)) { + m.m[codec] = createFunc + m.dp[codec] = newDecoderPool() } type nopResetter struct { @@ -125,7 +120,7 @@ func (n *nopResetter) Reset(r io.Reader) error { return nil } -type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error) +type PublicCreateDecoderFunc func(input io.Reader) (ReadResetter, error) // ErrPublicUnexpectedCodec return when try to read message content with unknown codec var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec")) diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go index 0c78c29d2..dd0699047 100644 --- a/internal/topic/topicreadercommon/decoders_test.go +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -52,7 +52,7 @@ func TestDecoderMap(t *testing.T) { t.Run("DecodeCustomCodec", func(t *testing.T) { dm := NewDecoderMap() customCodec := rawtopiccommon.Codec(1001) - dm.AddDecoder(customCodec, func(input io.Reader) (io.Reader, error) { + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { return gzip.NewReader(input) }) require.Len(t, dm.dp, 3) From 1bd0eb0e18025584c5a5c7ef788018c21d1af627 Mon Sep 17 00:00:00 2001 From: DLC Date: Thu, 13 Mar 2025 19:57:13 +0300 Subject: [PATCH 11/17] test(decoder): update TestDecoderMap with pool test --- .../topic/topicreadercommon/decoders_test.go | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go index dd0699047..6ab145286 100644 --- a/internal/topic/topicreadercommon/decoders_test.go +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -87,4 +87,82 @@ func TestDecoderMap(t *testing.T) { require.NoError(t, err) require.Equal(t, string(data2), string(result2)) }) + + t.Run("DecodeCustomCodec", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1001) + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + require.Len(t, dm.dp, 3) + + data := []byte("custom test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := dm.Decode(customCodec, &buf) + require.NoError(t, err) + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, string(data), string(result)) + require.NoError(t, decodedReader.(io.Closer).Close()) // Явный вызов Close вместо defer + + data2 := []byte("second test data") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + decodedReader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + result2, err := io.ReadAll(decodedReader2) + require.NoError(t, err) + require.Equal(t, string(data2), string(result2)) + require.NoError(t, decodedReader2.(io.Closer).Close()) // Явный вызов Close вместо defer + }) + + t.Run("PoolReuse", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1002) + + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + + data1 := []byte("hello") + var buf1 bytes.Buffer + gzipWriter1 := gzip.NewWriter(&buf1) + _, err := gzipWriter1.Write(data1) + require.NoError(t, err) + require.NoError(t, gzipWriter1.Close()) + + reader1, err := dm.Decode(customCodec, &buf1) + require.NoError(t, err, "first decoding should succeed") + result1, err := io.ReadAll(reader1) + require.NoError(t, err, "reading first message should succeed") + require.Equal(t, string(data1), string(result1), "data should match") + require.NoError(t, reader1.(io.Closer).Close(), "closing first reader should succeed") + + pool := dm.dp[customCodec] + reusedDecoder := pool.Get() + require.NotNil(t, reusedDecoder, "decoder should be returned to pool after Close") + + data2 := []byte("world") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + reader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err, "second decoding should succeed") + result2, err := io.ReadAll(reader2) + require.NoError(t, err, "reading second message should succeed") + require.Equal(t, string(data2), string(result2), "data of second message should match") + require.NoError(t, reader2.(io.Closer).Close(), "closing second reader should succeed") + }) } From 9e6a2375e873471bbb09422ec09e7c124c57d36c Mon Sep 17 00:00:00 2001 From: DLC Date: Wed, 19 Mar 2025 20:48:52 +0300 Subject: [PATCH 12/17] feat(topic): reuse decoders via sync.Pool in topicreadercommon - Implement decoder pooling with sync.Pool. - Add Close() methods to PublicMessage and oneTimeReader for proper resource management. - Update tests to verify correct decoder reuse and resource release after Close() calls. --- internal/topic/topicreadercommon/decoders.go | 7 +- .../topic/topicreadercommon/decoders_test.go | 4 +- internal/topic/topicreadercommon/message.go | 17 ++- .../topic/topicreadercommon/message_test.go | 139 ++++++++++++++++++ .../topicreadercommon/one_time_reader.go | 20 +++ .../topicreadercommon/one_time_reader_test.go | 84 ++++++++++- 6 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 internal/topic/topicreadercommon/message_test.go diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index ae02e21e6..3df9fc8f2 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -21,9 +21,10 @@ type decoderPool struct { } func (p *decoderPool) Get() ReadResetter { - dec, _ := p.pool.Get().(ReadResetter) - - return dec + if v := p.pool.Get(); v != nil { + return v.(ReadResetter) + } + return nil } func (p *decoderPool) Put(dec ReadResetter) { diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go index 6ab145286..9e9fce2fa 100644 --- a/internal/topic/topicreadercommon/decoders_test.go +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -108,7 +108,7 @@ func TestDecoderMap(t *testing.T) { result, err := io.ReadAll(decodedReader) require.NoError(t, err) require.Equal(t, string(data), string(result)) - require.NoError(t, decodedReader.(io.Closer).Close()) // Явный вызов Close вместо defer + require.NoError(t, decodedReader.(io.Closer).Close()) data2 := []byte("second test data") var buf2 bytes.Buffer @@ -122,7 +122,7 @@ func TestDecoderMap(t *testing.T) { result2, err := io.ReadAll(decodedReader2) require.NoError(t, err) require.Equal(t, string(data2), string(result2)) - require.NoError(t, decodedReader2.(io.Closer).Close()) // Явный вызов Close вместо defer + require.NoError(t, decodedReader2.(io.Closer).Close()) }) t.Run("PoolReuse", func(t *testing.T) { diff --git a/internal/topic/topicreadercommon/message.go b/internal/topic/topicreadercommon/message.go index 752b38b78..3314e55dc 100644 --- a/internal/topic/topicreadercommon/message.go +++ b/internal/topic/topicreadercommon/message.go @@ -58,10 +58,10 @@ func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error { if m.dataConsumed { return xerrors.WithStackTrace(errMessageWasReadEarly) } - m.dataConsumed = true - - return callbackOnReaderContent(globalReadMessagePool, m, m.UncompressedSize, dst) + err := callbackOnReaderContent(globalReadMessagePool, m, m.UncompressedSize, dst) + m.data.Close() + return err } // Read implements io.Reader @@ -73,7 +73,16 @@ func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error { func (m *PublicMessage) Read(p []byte) (n int, err error) { m.dataConsumed = true - return m.data.Read(p) + n, err = m.data.Read(p) + if err != nil { + m.data.Close() + } + + return n, err +} + +func (m *PublicMessage) Close() error { + return m.data.Close() } // PublicMessageContentUnmarshaler is interface for unmarshal message content diff --git a/internal/topic/topicreadercommon/message_test.go b/internal/topic/topicreadercommon/message_test.go new file mode 100644 index 000000000..237305f92 --- /dev/null +++ b/internal/topic/topicreadercommon/message_test.go @@ -0,0 +1,139 @@ +package topicreadercommon + +import ( + "bytes" + "compress/gzip" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" +) + +func TestPublicMessage(t *testing.T) { + t.Run("DecoderClosesAfterFullRead", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + _, err = io.ReadAll(msg) + require.NoError(t, err, "ReadAll() should complete without errors") + + _, err = gzipReader.Read([]byte{0}) + require.Error(t, err, "gzip.Reader should be closed after full read") + }) + + t.Run("DecoderNotClosedBeforeEOF", func(t *testing.T) { + data := []byte("test") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + readBuf := make([]byte, 10) + n, err := msg.Read(readBuf) + require.Equal(t, io.EOF, err, "gzip.Reader returns EOF immediately after last byte") + require.Equal(t, len(data), n, "should read all data at once") + + require.NoError(t, msg.Close(), "explicit Close after EOF should succeed") + }) + + t.Run("ReadAfterCloseReturnsEOF", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + require.NoError(t, msg.Close(), "Close() should execute without errors") + + readBuf := make([]byte, 2) + n, err := msg.Read(readBuf) + require.Equal(t, 0, n, "After Close(), Read() should return 0 bytes") + require.Equal(t, io.EOF, err, "After Close(), Read() should return EOF") + }) + + t.Run("DecoderClosesAfterReadToEOF", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + _, err = io.ReadAll(msg) + require.NoError(t, err) + + _, err = gzipReader.Read([]byte{0}) + require.Error(t, err, "gzip.Reader should be closed after full read to EOF") + }) + + t.Run("DecoderReuseFromPool", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1006) + + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + + var buf1 bytes.Buffer + gw1 := gzip.NewWriter(&buf1) + _, err := gw1.Write([]byte("message")) + require.NoError(t, err) + require.NoError(t, gw1.Close()) + + reader1, err := dm.Decode(customCodec, &buf1) + require.NoError(t, err) + + data1, err := io.ReadAll(reader1) + require.NoError(t, err) + require.Equal(t, "message", string(data1)) + + require.NoError(t, reader1.(io.Closer).Close()) + + pool := dm.dp[customCodec] + reusedDecoder := pool.Get() + require.NotNil(t, reusedDecoder, "Decoder должен вернуться в пул после Close") + + pool.Put(reusedDecoder) + + var buf2 bytes.Buffer + gw2 := gzip.NewWriter(&buf2) + _, err = gw2.Write([]byte("message2")) + require.NoError(t, err) + require.NoError(t, gw2.Close()) + + reader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + + data2, err := io.ReadAll(reader2) + require.NoError(t, err) + require.Equal(t, "message2", string(data2)) + + require.NoError(t, reader2.(io.Closer).Close()) + }) + +} diff --git a/internal/topic/topicreadercommon/one_time_reader.go b/internal/topic/topicreadercommon/one_time_reader.go index ee9a05789..c06119ea2 100644 --- a/internal/topic/topicreadercommon/one_time_reader.go +++ b/internal/topic/topicreadercommon/one_time_reader.go @@ -44,3 +44,23 @@ func (s *oneTimeReader) Read(p []byte) (n int, err error) { return n, err } + +func (s *oneTimeReader) Close() error { + if s.err != nil && s.err != io.EOF { + return s.err + } + if s.reader == nil { + s.reader = s.readerMaker() + } + if closer, ok := s.reader.(io.Closer); ok { + err := closer.Close() + if err != nil { + s.err = err + s.reader = nil + return err + } + } + s.reader = nil + s.err = io.EOF + return nil +} diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index bc8510279..538021c7d 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -2,6 +2,7 @@ package topicreadercommon import ( "bytes" + "compress/gzip" "errors" "io" "testing" @@ -68,7 +69,6 @@ func TestOneTimeReader(t *testing.T) { } r.reader = iotest.TimeoutReader(bytes.NewReader(preparedData)) - // first read is ok firstBuf := make([]byte, bufSize) n, err := r.Read(firstBuf) require.NoError(t, err) @@ -76,16 +76,94 @@ func TestOneTimeReader(t *testing.T) { require.Equal(t, preparedData[:bufSize], firstBuf) require.NoError(t, err) - // iotest.TimeoutReader return timeout for second read secondBuf := make([]byte, bufSize) n, err = r.Read(secondBuf) require.Equal(t, err, iotest.ErrTimeout) require.Equal(t, 0, n) require.Equal(t, make([]byte, bufSize), secondBuf) - // Next read again n, err = r.Read(secondBuf) require.Equal(t, err, iotest.ErrTimeout) require.Equal(t, 0, n) }) + + t.Run("CloseWithoutRead", func(t *testing.T) { + reader := newOneTimeReaderFromReader(bytes.NewReader([]byte("test"))) + err := reader.Close() + require.NoError(t, err) + }) + + t.Run("CloseTwice", func(t *testing.T) { + reader := newOneTimeReaderFromReader(bytes.NewReader([]byte("test"))) + require.NoError(t, reader.Close()) + require.NoError(t, reader.Close()) + }) + + t.Run("CloseReleasesResourcesWithGzipDecoder", func(t *testing.T) { + data := []byte("test data for gzip") + var buf bytes.Buffer + gzWriter := gzip.NewWriter(&buf) + _, err := gzWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzWriter.Close()) + + gzReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + reader := newOneTimeReaderFromReader(gzReader) + + tmpBuf := make([]byte, 4) + _, err = reader.Read(tmpBuf) + require.NoError(t, err) + + err = reader.Close() + require.NoError(t, err, "Close() should not return error for gzip.Reader") + + n, err := reader.Read(tmpBuf) + require.Equal(t, 0, n, "After Close(), read should return 0 bytes") + require.ErrorIs(t, err, io.EOF, "After Close(), read should return EOF") + }) + + t.Run("ReadAfterCloseReturnsEOFWithGzip", func(t *testing.T) { + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write([]byte("gzip data")) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + r := newOneTimeReaderFromReader(gzipReader) + require.NoError(t, r.Close(), "Close() should succeed") + + readBuf := make([]byte, 2) + n, err := r.Read(readBuf) + require.Equal(t, 0, n, "Read() after Close() should return 0 bytes") + require.Equal(t, io.EOF, err, "Read() after Close() should return EOF") + }) + + t.Run("GzipDecoderReturnedToPoolAfterClose", func(t *testing.T) { + pool := newDecoderPool() + + data := []byte("pool reuse test") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder, err := gzip.NewReader(&buf) + require.NoError(t, err) + + reader := newOneTimeReaderFromReader(decoder) + require.NoError(t, reader.Close(), "Close() should not return error") + + pool.Put(decoder) + + reusedDecoder := pool.Get() + require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") + require.Equal(t, decoder, reusedDecoder, "Same gzip.Reader instance should be reused from pool") + }) + } From dc5cdd1beac66dd6b08dfc239a722941a664de25 Mon Sep 17 00:00:00 2001 From: DLC Date: Wed, 19 Mar 2025 22:38:52 +0300 Subject: [PATCH 13/17] style: fix linters --- internal/topic/topicreadercommon/decoders.go | 7 +++---- internal/topic/topicreadercommon/message.go | 1 + internal/topic/topicreadercommon/message_test.go | 3 +-- internal/topic/topicreadercommon/one_time_reader.go | 5 ++++- internal/topic/topicreadercommon/one_time_reader_test.go | 1 - 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index 3df9fc8f2..ae02e21e6 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -21,10 +21,9 @@ type decoderPool struct { } func (p *decoderPool) Get() ReadResetter { - if v := p.pool.Get(); v != nil { - return v.(ReadResetter) - } - return nil + dec, _ := p.pool.Get().(ReadResetter) + + return dec } func (p *decoderPool) Put(dec ReadResetter) { diff --git a/internal/topic/topicreadercommon/message.go b/internal/topic/topicreadercommon/message.go index 3314e55dc..439327748 100644 --- a/internal/topic/topicreadercommon/message.go +++ b/internal/topic/topicreadercommon/message.go @@ -61,6 +61,7 @@ func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error { m.dataConsumed = true err := callbackOnReaderContent(globalReadMessagePool, m, m.UncompressedSize, dst) m.data.Close() + return err } diff --git a/internal/topic/topicreadercommon/message_test.go b/internal/topic/topicreadercommon/message_test.go index 237305f92..b62598cc1 100644 --- a/internal/topic/topicreadercommon/message_test.go +++ b/internal/topic/topicreadercommon/message_test.go @@ -116,7 +116,7 @@ func TestPublicMessage(t *testing.T) { pool := dm.dp[customCodec] reusedDecoder := pool.Get() - require.NotNil(t, reusedDecoder, "Decoder должен вернуться в пул после Close") + require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") pool.Put(reusedDecoder) @@ -135,5 +135,4 @@ func TestPublicMessage(t *testing.T) { require.NoError(t, reader2.(io.Closer).Close()) }) - } diff --git a/internal/topic/topicreadercommon/one_time_reader.go b/internal/topic/topicreadercommon/one_time_reader.go index c06119ea2..794f56874 100644 --- a/internal/topic/topicreadercommon/one_time_reader.go +++ b/internal/topic/topicreadercommon/one_time_reader.go @@ -1,6 +1,7 @@ package topicreadercommon import ( + "errors" "io" ) @@ -46,7 +47,7 @@ func (s *oneTimeReader) Read(p []byte) (n int, err error) { } func (s *oneTimeReader) Close() error { - if s.err != nil && s.err != io.EOF { + if s.err != nil && !errors.Is(s.err, io.EOF) { return s.err } if s.reader == nil { @@ -57,10 +58,12 @@ func (s *oneTimeReader) Close() error { if err != nil { s.err = err s.reader = nil + return err } } s.reader = nil s.err = io.EOF + return nil } diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index 538021c7d..bec395620 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -165,5 +165,4 @@ func TestOneTimeReader(t *testing.T) { require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") require.Equal(t, decoder, reusedDecoder, "Same gzip.Reader instance should be reused from pool") }) - } From e0f02e5c27925a5d63d9b553ac3876bc92f2776d Mon Sep 17 00:00:00 2001 From: DLC Date: Wed, 19 Mar 2025 23:41:03 +0300 Subject: [PATCH 14/17] chore: fix gzip pooling tests by calling Reset after Close --- internal/topic/topicreadercommon/decoders_test.go | 8 ++++---- .../topicreadercommon/one_time_reader_test.go | 14 +++++++++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go index 9e9fce2fa..bb2fa6779 100644 --- a/internal/topic/topicreadercommon/decoders_test.go +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -142,14 +142,12 @@ func TestDecoderMap(t *testing.T) { reader1, err := dm.Decode(customCodec, &buf1) require.NoError(t, err, "first decoding should succeed") + result1, err := io.ReadAll(reader1) require.NoError(t, err, "reading first message should succeed") require.Equal(t, string(data1), string(result1), "data should match") - require.NoError(t, reader1.(io.Closer).Close(), "closing first reader should succeed") - pool := dm.dp[customCodec] - reusedDecoder := pool.Get() - require.NotNil(t, reusedDecoder, "decoder should be returned to pool after Close") + require.NoError(t, reader1.(io.Closer).Close(), "closing first reader should succeed") data2 := []byte("world") var buf2 bytes.Buffer @@ -160,9 +158,11 @@ func TestDecoderMap(t *testing.T) { reader2, err := dm.Decode(customCodec, &buf2) require.NoError(t, err, "second decoding should succeed") + result2, err := io.ReadAll(reader2) require.NoError(t, err, "reading second message should succeed") require.Equal(t, string(data2), string(result2), "data of second message should match") + require.NoError(t, reader2.(io.Closer).Close(), "closing second reader should succeed") }) } diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index bec395620..e03aec5f0 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -163,6 +163,18 @@ func TestOneTimeReader(t *testing.T) { reusedDecoder := pool.Get() require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") - require.Equal(t, decoder, reusedDecoder, "Same gzip.Reader instance should be reused from pool") + + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write([]byte("next message")) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + err = reusedDecoder.Reset(&buf2) + require.NoError(t, err, "Decoder Reset() должен выполняться успешно") + + result, err := io.ReadAll(reusedDecoder) + require.NoError(t, err) + require.Equal(t, "next message", string(result)) }) } From 99ed885a479b7f920e88fed853f81365fccb9f4c Mon Sep 17 00:00:00 2001 From: DLC Date: Thu, 20 Mar 2025 00:11:16 +0300 Subject: [PATCH 15/17] chore: correct decoder reuse tests by using Decode() for gzip --- .../topic/topicreadercommon/message_test.go | 18 +++++++-------- .../topicreadercommon/one_time_reader_test.go | 22 ++++++++++++------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/internal/topic/topicreadercommon/message_test.go b/internal/topic/topicreadercommon/message_test.go index b62598cc1..fec52fb73 100644 --- a/internal/topic/topicreadercommon/message_test.go +++ b/internal/topic/topicreadercommon/message_test.go @@ -108,17 +108,13 @@ func TestPublicMessage(t *testing.T) { reader1, err := dm.Decode(customCodec, &buf1) require.NoError(t, err) - data1, err := io.ReadAll(reader1) + msg1 := &PublicMessage{data: newOneTimeReaderFromReader(reader1)} + + data1, err := io.ReadAll(msg1) require.NoError(t, err) require.Equal(t, "message", string(data1)) - require.NoError(t, reader1.(io.Closer).Close()) - - pool := dm.dp[customCodec] - reusedDecoder := pool.Get() - require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") - - pool.Put(reusedDecoder) + require.NoError(t, msg1.Close()) var buf2 bytes.Buffer gw2 := gzip.NewWriter(&buf2) @@ -129,10 +125,12 @@ func TestPublicMessage(t *testing.T) { reader2, err := dm.Decode(customCodec, &buf2) require.NoError(t, err) - data2, err := io.ReadAll(reader2) + msg2 := &PublicMessage{data: newOneTimeReaderFromReader(reader2)} + + data2, err := io.ReadAll(msg2) require.NoError(t, err) require.Equal(t, "message2", string(data2)) - require.NoError(t, reader2.(io.Closer).Close()) + require.NoError(t, msg2.Close()) }) } diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index e03aec5f0..e0c9dfeb7 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "errors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "io" "testing" "testing/iotest" @@ -144,7 +145,8 @@ func TestOneTimeReader(t *testing.T) { }) t.Run("GzipDecoderReturnedToPoolAfterClose", func(t *testing.T) { - pool := newDecoderPool() + dm := NewDecoderMap() + codec := rawtopiccommon.CodecGzip data := []byte("pool reuse test") var buf bytes.Buffer @@ -153,16 +155,18 @@ func TestOneTimeReader(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter.Close()) - decoder, err := gzip.NewReader(&buf) + decoder, err := dm.Decode(codec, &buf) require.NoError(t, err) reader := newOneTimeReaderFromReader(decoder) - require.NoError(t, reader.Close(), "Close() should not return error") + _, err = io.ReadAll(&reader) + require.NoError(t, err) - pool.Put(decoder) + require.NoError(t, reader.Close(), "Close() should not return error") - reusedDecoder := pool.Get() + reusedDecoder := dm.dp[codec].Get() require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") + dm.dp[codec].Put(reusedDecoder) var buf2 bytes.Buffer gzipWriter2 := gzip.NewWriter(&buf2) @@ -170,11 +174,13 @@ func TestOneTimeReader(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter2.Close()) - err = reusedDecoder.Reset(&buf2) - require.NoError(t, err, "Decoder Reset() должен выполняться успешно") + reader2, err := dm.Decode(codec, &buf2) + require.NoError(t, err) - result, err := io.ReadAll(reusedDecoder) + result, err := io.ReadAll(reader2) require.NoError(t, err) require.Equal(t, "next message", string(result)) + + require.NoError(t, reader2.(io.Closer).Close()) }) } From 132b448d72ab6b858dd62b7d71fd58057299b1bf Mon Sep 17 00:00:00 2001 From: DLC Date: Thu, 20 Mar 2025 02:41:08 +0300 Subject: [PATCH 16/17] chore: resolve conflict in CHANGELOG.md and fix linter --- CHANGELOG.md | 40 +++++++++++++++++++ .../topicreadercommon/one_time_reader_test.go | 3 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c70067382..ad5467241 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,46 @@ * Supported pool of decoders +## v3.104.5 +* Added query client session pool metrics: create_in_progress, in_use, waiters_queue +* Added pool item closing for not-alived item + +## v3.104.4 +* Fixed bug with session query latency metric collector + +## v3.104.3 +* Changed argument types in `table.Client.ReadRows` to public types for compatibility with mock-generation + +## v3.104.2 +* Added bindings options into `ydb.ParamsFromMap` for bind wide time types +* Changed `ydb.WithWideTimeTypes(bool)` for allow boolean argument + +## v3.104.1 +* Added export of advanced metric information for QueryService calls + +## v3.104.0 +* Added binding `ydb.WithWideTimeTypes()` which interprets `time.Time` and `time.Duration` as `Timestamp64` and `Interval64` YDB types + +## v3.103.0 +* Supported wide `Interval64` type + +## v3.102.0 +* Supported wide `Date32`, `Datetime64` and `Timestamp64` types + +## v3.101.4 +* Switched internal type of result `ydb.Driver.Query()` from `*internal/query.Client` to `query.Client` interface + +## v3.101.3 +* Added `query.TransactionActor` type alias to `query.TxActor` for compatibility with `table.Client` API's +* Removed comment `experimental` from `ydb.ParamsBuilder` and `ydb.ParamsFromMap` +* Fixed panic on closing `internal/query/sessionCore.done` channel twice +* Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals + +## v3.101.2 +* Added a new metric `ydb_go_sdk_ydb_info` with the current version of the SDK + +## v3.101.1 +* Changed allowBanned=false for preferred node connections ## v3.101.0 * Added `table.Client.ReadRows` method with internal retries diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index e0c9dfeb7..baeac105c 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -4,12 +4,13 @@ import ( "bytes" "compress/gzip" "errors" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "io" "testing" "testing/iotest" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" ) func TestOneTimeReader(t *testing.T) { From 08c70eeacc5d3f26770a2edc518be65d1a2a65a4 Mon Sep 17 00:00:00 2001 From: DLC Date: Thu, 20 Mar 2025 10:28:55 +0300 Subject: [PATCH 17/17] fix: prevent oneTimeReader from failing in GzipDecoderReturnedToPoolAfterClose test --- .../topicreadercommon/one_time_reader_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index baeac105c..bd1c6e80c 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -149,6 +149,10 @@ func TestOneTimeReader(t *testing.T) { dm := NewDecoderMap() codec := rawtopiccommon.CodecGzip + dm.AddDecoder(codec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + data := []byte("pool reuse test") var buf bytes.Buffer gzipWriter := gzip.NewWriter(&buf) @@ -160,15 +164,12 @@ func TestOneTimeReader(t *testing.T) { require.NoError(t, err) reader := newOneTimeReaderFromReader(decoder) - _, err = io.ReadAll(&reader) + result, err := io.ReadAll(&reader) require.NoError(t, err) + require.Equal(t, "pool reuse test", string(result)) require.NoError(t, reader.Close(), "Close() should not return error") - reusedDecoder := dm.dp[codec].Get() - require.NotNil(t, reusedDecoder, "Decoder should be retrieved from pool after Close") - dm.dp[codec].Put(reusedDecoder) - var buf2 bytes.Buffer gzipWriter2 := gzip.NewWriter(&buf2) _, err = gzipWriter2.Write([]byte("next message")) @@ -178,9 +179,9 @@ func TestOneTimeReader(t *testing.T) { reader2, err := dm.Decode(codec, &buf2) require.NoError(t, err) - result, err := io.ReadAll(reader2) + result2, err := io.ReadAll(reader2) require.NoError(t, err) - require.Equal(t, "next message", string(result)) + require.Equal(t, "next message", string(result2)) require.NoError(t, reader2.(io.Closer).Close()) })