From 1dd6f7a5dfd650b55902dc4debfd4e5e699c585b Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Fri, 4 Jul 2025 19:15:59 +0300 Subject: [PATCH 01/18] Execute queries without explicit session creation --- .github/workflows/tests.yml | 4 +- CHANGELOG.md | 1 + internal/query/client.go | 81 +++++++++++++++---- internal/query/client_test.go | 44 ++++++++++ internal/query/config/config.go | 20 +++-- internal/query/config/options.go | 11 ++- internal/query/implicit_session_bench_test.go | 61 ++++++++++++++ internal/query/options/execute.go | 10 +++ internal/query/options/retry.go | 4 + options.go | 9 +++ query/execute_options.go | 6 ++ .../implicit_session_bench_test.go | 40 +++++++++ testutil/driver.go | 38 ++++++++- 13 files changed, 298 insertions(+), 31 deletions(-) create mode 100644 internal/query/implicit_session_bench_test.go create mode 100644 tests/integration/implicit_session_bench_test.go diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1e51df45b..1dd828fa3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,7 +19,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.21.x, 1.24.x] + go-version: [1.22.x, 1.24.x] os: [ubuntu, windows, macOS] env: OS: ${{ matrix.os }}-latest @@ -52,7 +52,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.21.x, 1.24.x] + go-version: [1.22.x, 1.24.x] ydb-version: [latest, 24.4, 25.1] os: [ubuntu] services: diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bd1a3acb..67d8aa93c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added a feature: "Execute queries without explicit session creation" * Fixed the support of server-side session balancing in `database/sql` driver * Added `ydb.WithDisableSessionBalancer()` driver option for disable server-side session balancing on table and query clients diff --git a/internal/query/client.go b/internal/query/client.go index 05491c1f8..e62ab44b2 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -45,6 +45,10 @@ type ( client Ydb_Query_V1.QueryServiceClient pool sessionPool + // implicitSessionPool is a pool of implicit sessions, + // i.e. fake sessions created without CreateSession/AttachSession requests. + implicitSessionPool sessionPool + done chan struct{} } ) @@ -329,7 +333,12 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute onDone(finalErr) }() - row, err := clientQueryRow(ctx, c.pool, q, settings, withTrace(c.config.Trace())) + pool := c.pool + if settings.IsImplicitSession() { + pool = c.implicitSessionPool + } + + row, err := clientQueryRow(ctx, pool, q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -376,7 +385,12 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f onDone(finalErr) }() - err := clientExec(ctx, c.pool, q, opts...) + pool := c.pool + if settings.IsImplicitSession() { + pool = c.implicitSessionPool + } + + err := clientExec(ctx, pool, q, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -424,7 +438,12 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) ( onDone(err) }() - r, err = clientQuery(ctx, c.pool, q, opts...) + pool := c.pool + if settings.IsImplicitSession() { + pool = c.implicitSessionPool + } + + r, err = clientQuery(ctx, pool, q, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -479,7 +498,12 @@ func (c *Client) QueryResultSet( onDone(finalErr, rowsCount) }() - rs, rowsCount, err = clientQueryResultSet(ctx, c.pool, q, settings, withTrace(c.config.Trace())) + pool := c.pool + if settings.IsImplicitSession() { + pool = c.implicitSessionPool + } + + rs, rowsCount, err = clientQueryResultSet(ctx, pool, q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -566,24 +590,25 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * client := Ydb_Query_V1.NewQueryServiceClient(cc) return &Client{ - config: cfg, - client: client, - done: make(chan struct{}), + config: cfg, + client: client, + done: make(chan struct{}), + implicitSessionPool: createImplicitSessionPool(ctx, cfg, client, cc), pool: pool.New(ctx, - pool.WithLimit[*Session, Session](cfg.PoolLimit()), - pool.WithItemUsageLimit[*Session, Session](cfg.PoolSessionUsageLimit()), - pool.WithItemUsageTTL[*Session, Session](cfg.PoolSessionUsageTTL()), - pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), - pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()), - pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()), - pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool { + pool.WithLimit[*Session](cfg.PoolLimit()), + pool.WithItemUsageLimit[*Session](cfg.PoolSessionUsageLimit()), + pool.WithItemUsageTTL[*Session](cfg.PoolSessionUsageTTL()), + pool.WithTrace[*Session](poolTrace(cfg.Trace())), + pool.WithCreateItemTimeout[*Session](cfg.SessionCreateTimeout()), + pool.WithCloseItemTimeout[*Session](cfg.SessionDeleteTimeout()), + pool.WithMustDeleteItemFunc(func(s *Session, err error) bool { if !s.IsAlive() { return true } return err != nil && xerrors.MustDeleteTableOrQuerySession(err) }), - pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()), + pool.WithIdleTimeToLive[*Session](cfg.SessionIdleTimeToLive()), pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { var ( createCtx context.Context @@ -617,6 +642,32 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * } } +func createImplicitSessionPool(ctx context.Context, + cfg *config.Config, + c Ydb_Query_V1.QueryServiceClient, + cc grpc.ClientConnInterface, +) sessionPool { + return pool.New(ctx, + pool.WithLimit[*Session](cfg.ImplicitSessionPoolLimit()), + pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { + core := &sessionCore{ + cc: cc, + Client: c, + Trace: cfg.Trace(), + done: make(chan struct{}), + } + + core.SetStatus(StatusIdle) + + return &Session{ + Core: core, + trace: cfg.Trace(), + client: c, + }, nil + }), + ) +} + func poolTrace(t *trace.Query) *pool.Trace { return &pool.Trace{ OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 1994c9112..21c131255 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -20,11 +20,13 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/testutil" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -842,6 +844,13 @@ func TestClient(t *testing.T) { }), "") require.NoError(t, err) }) + + t.Run("WithImplicitSession", func(t *testing.T) { + err := mockClientForImplicitSessionTest(ctx). + Exec(ctx, "SELECT 1", query.WithImplicitSession()) + + require.NoError(t, err) + }) }) t.Run("Query", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1079,6 +1088,12 @@ func TestClient(t *testing.T) { require.Nil(t, r3) } }) + t.Run("WithImplicitSession", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx). + Query(ctx, "SELECT 1", query.WithImplicitSession()) + + require.NoError(t, err) + }) }) t.Run("QueryResultSet", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1397,6 +1412,12 @@ func TestClient(t *testing.T) { require.Nil(t, rs) require.Equal(t, 0, rowsCount) }) + t.Run("WithImplicitSession", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx). + QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) + + require.NoError(t, err) + }) }) t.Run("QueryRow", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1537,9 +1558,32 @@ func TestClient(t *testing.T) { require.ErrorIs(t, err, errMoreThanOneRow) require.Nil(t, row) }) + + t.Run("WithImplicitSession", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx). + QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) + + require.NoError(t, err) + }) }) } +// mockClientForImplicitSessionTest creates a new Client with a test balancer +// for simulating implicit session scenarios in query client testing. It configures +// the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. +func mockClientForImplicitSessionTest(ctx context.Context) *Client { + balancer := testutil.NewBalancer( + testutil.WithInvokeHandlers(testutil.InvokeHandlers{}), + testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ + testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { + return testutil.MockClientStream(), nil + }, + }), + ) + + return New(ctx, balancer, config.New()) +} + type sessionControllerMock struct { id string status Status diff --git a/internal/query/config/config.go b/internal/query/config/config.go index 8eb221f38..bb86c4855 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -17,9 +17,10 @@ const ( type Config struct { config.Common - poolLimit int - poolSessionUsageLimit uint64 - poolSessionUsageTTL time.Duration + poolLimit int + implicitSessionPoolLimit int + poolSessionUsageLimit uint64 + poolSessionUsageTTL time.Duration sessionCreateTimeout time.Duration sessionDeleteTimeout time.Duration @@ -43,10 +44,11 @@ func New(opts ...Option) *Config { func defaults() *Config { return &Config{ - poolLimit: DefaultPoolMaxSize, - sessionCreateTimeout: DefaultSessionCreateTimeout, - sessionDeleteTimeout: DefaultSessionDeleteTimeout, - trace: &trace.Query{}, + poolLimit: DefaultPoolMaxSize, + implicitSessionPoolLimit: DefaultPoolMaxSize, + sessionCreateTimeout: DefaultSessionCreateTimeout, + sessionDeleteTimeout: DefaultSessionDeleteTimeout, + trace: &trace.Query{}, } } @@ -62,6 +64,10 @@ func (c *Config) PoolLimit() int { return c.poolLimit } +func (c *Config) ImplicitSessionPoolLimit() int { + return c.implicitSessionPoolLimit +} + func (c *Config) PoolSessionUsageLimit() uint64 { return c.poolSessionUsageLimit } diff --git a/internal/query/config/options.go b/internal/query/config/options.go index ad7bda207..835a8ae81 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -23,9 +23,6 @@ func WithTrace(trace *trace.Query, opts ...trace.QueryComposeOption) Option { } } -// WithPoolLimit defines upper bound of pooled sessions. -// If poolLimit is less than or equal to zero then the -// DefaultPoolMaxSize variable is used as a poolLimit. func WithPoolLimit(size int) Option { return func(c *Config) { if size > 0 { @@ -34,6 +31,14 @@ func WithPoolLimit(size int) Option { } } +func WithImplicitSessionPoolLimit(size int) Option { + return func(c *Config) { + if size > 0 { + c.implicitSessionPoolLimit = size + } + } +} + // WithSessionPoolSessionUsageLimit set pool session max usage: // - if argument type is uint64 - WithSessionPoolSessionUsageLimit limits max usage count of pool session // - if argument type is time.Duration - WithSessionPoolSessionUsageLimit limits max time to live of pool session diff --git a/internal/query/implicit_session_bench_test.go b/internal/query/implicit_session_bench_test.go new file mode 100644 index 000000000..ede24d480 --- /dev/null +++ b/internal/query/implicit_session_bench_test.go @@ -0,0 +1,61 @@ +package query + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/testutil" +) + +func BenchmarkImplicitSessionsOnBalancerMock(b *testing.B) { + queryAttachStream := testutil.MockClientStream() + queryAttachStream.OnRecvMsg = func(m any) error { + m.(*Ydb_Query.SessionState).Status = Ydb.StatusIds_SUCCESS + + return nil + } + + ctx := context.Background() + balancer := testutil.NewBalancer( + testutil.WithInvokeHandlers(testutil.InvokeHandlers{ + testutil.QueryCreateSession: func(request any) (result proto.Message, err error) { + return &Ydb_Query.CreateSessionResponse{ + SessionId: testutil.SessionID(), + }, nil + }, + }), + testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ + testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { + return testutil.MockClientStream(), nil + }, + testutil.QueryAttachSession: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { + return queryAttachStream, nil + }, + }), + ) + client := New(ctx, balancer, config.New()) + + b.Run("explicit", func(b *testing.B) { + for range b.N { + err := client.Exec(ctx, "SELECT 1") + require.NoError(b, err) + } + }) + + implicitSessionOpt := query.WithImplicitSession() + + b.Run("implicit", func(b *testing.B) { + for range b.N { + err := client.Exec(ctx, "SELECT 1", implicitSessionOpt) + require.NoError(b, err) + } + }) +} diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index cb60336ae..d6144106b 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -44,6 +44,7 @@ type ( retryOptions []retry.Option responsePartLimitBytes int64 label string + isImplicitSession bool } // Execute is an interface for execute method options @@ -70,6 +71,7 @@ type ( } execModeOption = ExecMode responsePartLimitBytes int64 + implicitSessionOption struct{} ) func (poolID resourcePool) applyExecuteOption(s *executeSettings) { @@ -192,6 +194,14 @@ func (s *executeSettings) Label() string { return s.label } +func (s *executeSettings) IsImplicitSession() bool { + return s.isImplicitSession +} + +func (implicitSessionOption) applyExecuteOption(s *executeSettings) { + s.isImplicitSession = true +} + func WithParameters(params params.Parameters) parametersOption { return parametersOption{ params: params, diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index 1ce69ff08..e0f5e2673 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -109,6 +109,10 @@ func WithIdempotent() RetryOptionsOption { return []retry.Option{retry.WithIdempotent(true)} } +func WithImplicitSession() implicitSessionOption { + return implicitSessionOption{} +} + func WithLabel(lbl string) LabelOption { return LabelOption(lbl) } diff --git a/options.go b/options.go index 15c083eae..0d8b926e6 100644 --- a/options.go +++ b/options.go @@ -545,6 +545,15 @@ func WithSessionPoolSizeLimit(sizeLimit int) Option { } } +// WithImplicitSessionPoolSizeLimit set max size of implicit sessions pool in query.Client +func WithImplicitSessionPoolSizeLimit(sizeLimit int) Option { + return func(ctx context.Context, d *Driver) error { + d.queryOptions = append(d.queryOptions, queryConfig.WithImplicitSessionPoolLimit(sizeLimit)) + + return nil + } +} + // WithSessionPoolSessionUsageLimit set pool session max usage: // - if argument type is uint64 - WithSessionPoolSessionUsageLimit limits max usage count of pool session // - if argument type is time.Duration - WithSessionPoolSessionUsageLimit limits max time to live of pool session diff --git a/query/execute_options.go b/query/execute_options.go index 784189e9c..d5c35a298 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -49,6 +49,12 @@ func WithExecMode(mode options.ExecMode) ExecuteOption { return options.WithExecMode(mode) } +// WithImplicitSession is an option to execute a query using an implicit session +// which allows the query to be executed without explicitly creating a session +func WithImplicitSession() ExecuteOption { + return options.WithImplicitSession() +} + func WithSyntax(syntax options.Syntax) ExecuteOption { return options.WithSyntax(syntax) } diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/implicit_session_bench_test.go new file mode 100644 index 000000000..6cae8655f --- /dev/null +++ b/tests/integration/implicit_session_bench_test.go @@ -0,0 +1,40 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "testing" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +func BenchmarkImplicitSessions(b *testing.B) { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local") + if err != nil { + panic(err) + } + defer db.Close(ctx) // cleanup resources + + q := db.Query() + + // Warmup + q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + + b.Run("implicit", func(b *testing.B) { + for range b.N { + q.Query(ctx, `SELECT 42 as id, "my string" as myStr`, + query.WithImplicitSession(), + ) + } + }) + + b.Run("explicit", func(b *testing.B) { + for range b.N { + q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + } + }) +} diff --git a/testutil/driver.go b/testutil/driver.go index d1ffd8e7a..74968de1e 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -3,10 +3,13 @@ package testutil import ( "context" "fmt" + "io" "reflect" "strings" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -58,6 +61,10 @@ const ( TableDescribeTableOptions TableStreamReadTable TableStreamExecuteScanQuery + + QueryCreateSession + QueryExecuteQuery + QueryAttachSession ) var grpcMethodToCode = map[Method]MethodCode{ @@ -79,6 +86,10 @@ var grpcMethodToCode = map[Method]MethodCode{ "/Ydb.Table.V1.TableService/DescribeTableOptions": TableDescribeTableOptions, "/Ydb.Table.V1.TableService/StreamReadTable": TableStreamReadTable, "/Ydb.Table.V1.TableService/StreamExecuteScanQuery": TableStreamExecuteScanQuery, + + "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, + "/Ydb.Query.V1.QueryService/CreateSession": QueryCreateSession, + "/Ydb.Query.V1.QueryService/AttachSession": QueryAttachSession, } var codeToString = map[MethodCode]string{ @@ -107,10 +118,7 @@ func setField(name string, dst, value interface{}) { t := x.Type() f, ok := t.FieldByName(name) if !ok { - panic(fmt.Sprintf( - "struct %s has no field %q", - t, name, - )) + return } v := reflect.ValueOf(value) if f.Type.Kind() != v.Type().Kind() { @@ -365,6 +373,28 @@ func (s *ClientStream) RecvMsg(m interface{}) error { return s.OnRecvMsg(m) } +func MockClientStream() *ClientStream { + var recvMsgAlreadySent bool + + return &ClientStream{ + OnSendMsg: func(m any) error { return nil }, + OnCloseSend: func() error { return nil }, + OnRecvMsg: func(m any) error { + if recvMsgAlreadySent { + return io.EOF + } + recvMsgAlreadySent = true + + switch resp := m.(type) { // you can freely add additional mock data + case *Ydb_Query.ExecuteQueryResponsePart: + resp.ResultSet = &Ydb.ResultSet{Rows: []*Ydb.Value{{}}} + } + + return nil + }, + } +} + func lastSegment(m string) string { s := strings.Split(m, "/") From 0bb2b739ee78e48982dc254eb32c97f316d8772e Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Mon, 7 Jul 2025 16:22:18 +0300 Subject: [PATCH 02/18] fix comments from Code Review --- VERSIONING.md | 3 +- internal/query/client.go | 2 +- internal/query/client_test.go | 72 ++++++++++++++++++- internal/query/config/config.go | 20 ++---- internal/query/config/options.go | 8 --- internal/query/errors.go | 23 +++--- internal/query/execute_query.go | 1 + internal/query/implicit_session_bench_test.go | 61 ---------------- internal/query/session.go | 24 ++++++- internal/query/transaction.go | 16 +++++ options.go | 9 --- .../implicit_session_bench_test.go | 35 ++++++--- testutil/driver.go | 10 ++- 13 files changed, 166 insertions(+), 118 deletions(-) delete mode 100644 internal/query/implicit_session_bench_test.go diff --git a/VERSIONING.md b/VERSIONING.md index 40cb9171c..5054ad5b8 100644 --- a/VERSIONING.md +++ b/VERSIONING.md @@ -21,4 +21,5 @@ We making the following exceptions to those guidelines: - Some public API of `ydb-go-sdk` relate to the internals. - We use the `// Internals` comment for public internals in the `ydb-go-sdk`. - `ydb-go-sdk` internals can be changed at any time without increase of major part of version. - - Internals will never marked as stable + - Internals will never marked as stable + - `testutil` package can be changed at any time without increase of major part of version. diff --git a/internal/query/client.go b/internal/query/client.go index e62ab44b2..a7c2e411b 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -648,7 +648,7 @@ func createImplicitSessionPool(ctx context.Context, cc grpc.ClientConnInterface, ) sessionPool { return pool.New(ctx, - pool.WithLimit[*Session](cfg.ImplicitSessionPoolLimit()), + pool.WithLimit[*Session](cfg.PoolLimit()), pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { core := &sessionCore{ cc: cc, diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 21c131255..3563f2a22 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" @@ -172,6 +173,25 @@ func TestClient(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, counter) }) + + t.Run("WithImplicitSession", func(t *testing.T) { + mockClientTransactionalForImplicitSessionTest(ctx). + Do(ctx, func(ctx context.Context, s query.Session) error { + err := s.Exec(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = s.Query(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = s.QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = s.QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + return nil + }) + }) }) t.Run("DoTx", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -674,6 +694,27 @@ func TestClient(t *testing.T) { }) }) }) + + t.Run("WithImplicitSession", func(t *testing.T) { + err := mockClientTransactionalForImplicitSessionTest(ctx). + DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { + err := tx.Exec(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = tx.Query(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = tx.QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + _, err = tx.QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) + require.ErrorContains(t, err, "implicit sessions are not supported") + + return nil + }) + + require.NoError(t, err) // no transaction errors + }) }) t.Run("Exec", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1573,7 +1614,6 @@ func TestClient(t *testing.T) { // the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. func mockClientForImplicitSessionTest(ctx context.Context) *Client { balancer := testutil.NewBalancer( - testutil.WithInvokeHandlers(testutil.InvokeHandlers{}), testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { return testutil.MockClientStream(), nil @@ -1584,6 +1624,36 @@ func mockClientForImplicitSessionTest(ctx context.Context) *Client { return New(ctx, balancer, config.New()) } +// mockClientTransactionalForImplicitSessionTest creates a new Client with a test balancer +// for simulating transactional implicit session scenarios in query client testing. +// It configures the mock to return successful responses for session creation and attachment. +// But returns error if real Qeury was sended. +func mockClientTransactionalForImplicitSessionTest(ctx context.Context) *Client { + balancer := testutil.NewBalancer( + testutil.WithInvokeHandlers(testutil.InvokeHandlers{ + testutil.QueryCreateSession: func(any) (proto.Message, error) { + return &Ydb_Query.CreateSessionResponse{}, nil + }, + testutil.QueryBeginTransaction: func(any) (proto.Message, error) { + return &Ydb_Query.BeginTransactionResponse{}, nil + }, + testutil.QueryCommitTransaction: func(any) (proto.Message, error) { + return &Ydb_Query.CommitTransactionResponse{}, nil + }, + }), + testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ + testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { + return nil, errors.New("ExecuteQuery should not be called for transactional implicit session") + }, + testutil.QueryAttachSession: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { + return testutil.MockClientStream(), nil + }, + }), + ) + + return New(ctx, balancer, config.New()) +} + type sessionControllerMock struct { id string status Status diff --git a/internal/query/config/config.go b/internal/query/config/config.go index bb86c4855..8eb221f38 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -17,10 +17,9 @@ const ( type Config struct { config.Common - poolLimit int - implicitSessionPoolLimit int - poolSessionUsageLimit uint64 - poolSessionUsageTTL time.Duration + poolLimit int + poolSessionUsageLimit uint64 + poolSessionUsageTTL time.Duration sessionCreateTimeout time.Duration sessionDeleteTimeout time.Duration @@ -44,11 +43,10 @@ func New(opts ...Option) *Config { func defaults() *Config { return &Config{ - poolLimit: DefaultPoolMaxSize, - implicitSessionPoolLimit: DefaultPoolMaxSize, - sessionCreateTimeout: DefaultSessionCreateTimeout, - sessionDeleteTimeout: DefaultSessionDeleteTimeout, - trace: &trace.Query{}, + poolLimit: DefaultPoolMaxSize, + sessionCreateTimeout: DefaultSessionCreateTimeout, + sessionDeleteTimeout: DefaultSessionDeleteTimeout, + trace: &trace.Query{}, } } @@ -64,10 +62,6 @@ func (c *Config) PoolLimit() int { return c.poolLimit } -func (c *Config) ImplicitSessionPoolLimit() int { - return c.implicitSessionPoolLimit -} - func (c *Config) PoolSessionUsageLimit() uint64 { return c.poolSessionUsageLimit } diff --git a/internal/query/config/options.go b/internal/query/config/options.go index 835a8ae81..3553c11ac 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -31,14 +31,6 @@ func WithPoolLimit(size int) Option { } } -func WithImplicitSessionPoolLimit(size int) Option { - return func(c *Config) { - if size > 0 { - c.implicitSessionPoolLimit = size - } - } -} - // WithSessionPoolSessionUsageLimit set pool session max usage: // - if argument type is uint64 - WithSessionPoolSessionUsageLimit limits max usage count of pool session // - if argument type is time.Duration - WithSessionPoolSessionUsageLimit limits max time to live of pool session diff --git a/internal/query/errors.go b/internal/query/errors.go index 3a21083d3..b52feb1a1 100644 --- a/internal/query/errors.go +++ b/internal/query/errors.go @@ -7,15 +7,16 @@ import ( ) var ( - errNilClient = xerrors.Wrap(errors.New("table client is not initialized")) - ErrTransactionRollingBack = xerrors.Wrap(errors.New("the transaction is rolling back")) - errWrongNextResultSetIndex = errors.New("wrong result set index") - errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") - errMoreThanOneRow = errors.New("unexpected more than one row in result set") - errMoreThanOneResultSet = errors.New("unexpected more than one result set") - errNoResultSets = errors.New("no result sets") - errNilOption = errors.New("nil option") - ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction") - errExecuteOnCompletedTx = errors.New("execute on completed transaction") - errSessionClosed = errors.New("session is closed") + errNilClient = xerrors.Wrap(errors.New("table client is not initialized")) + ErrTransactionRollingBack = xerrors.Wrap(errors.New("the transaction is rolling back")) + errWrongNextResultSetIndex = errors.New("wrong result set index") + errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") + errMoreThanOneRow = errors.New("unexpected more than one row in result set") + errMoreThanOneResultSet = errors.New("unexpected more than one result set") + errNoResultSets = errors.New("no result sets") + errNilOption = errors.New("nil option") + ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction") + errExecuteOnCompletedTx = errors.New("execute on completed transaction") + errSessionClosed = errors.New("session is closed") + errImplicitSessionsNotSupported = errors.New("implicit sessions are not supported") ) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 71c408c7b..7fe8799bb 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -32,6 +32,7 @@ type executeSettings interface { ResourcePool() string ResponsePartLimitSizeBytes() int64 Label() string + IsImplicitSession() bool } type executeScriptConfig interface { diff --git a/internal/query/implicit_session_bench_test.go b/internal/query/implicit_session_bench_test.go deleted file mode 100644 index ede24d480..000000000 --- a/internal/query/implicit_session_bench_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package query - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" - "google.golang.org/grpc" - "google.golang.org/protobuf/proto" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" - "github.com/ydb-platform/ydb-go-sdk/v3/query" - "github.com/ydb-platform/ydb-go-sdk/v3/testutil" -) - -func BenchmarkImplicitSessionsOnBalancerMock(b *testing.B) { - queryAttachStream := testutil.MockClientStream() - queryAttachStream.OnRecvMsg = func(m any) error { - m.(*Ydb_Query.SessionState).Status = Ydb.StatusIds_SUCCESS - - return nil - } - - ctx := context.Background() - balancer := testutil.NewBalancer( - testutil.WithInvokeHandlers(testutil.InvokeHandlers{ - testutil.QueryCreateSession: func(request any) (result proto.Message, err error) { - return &Ydb_Query.CreateSessionResponse{ - SessionId: testutil.SessionID(), - }, nil - }, - }), - testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ - testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return testutil.MockClientStream(), nil - }, - testutil.QueryAttachSession: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return queryAttachStream, nil - }, - }), - ) - client := New(ctx, balancer, config.New()) - - b.Run("explicit", func(b *testing.B) { - for range b.N { - err := client.Exec(ctx, "SELECT 1") - require.NoError(b, err) - } - }) - - implicitSessionOpt := query.WithImplicitSession() - - b.Run("implicit", func(b *testing.B) { - for range b.N { - err := client.Exec(ctx, "SELECT 1", implicitSessionOpt) - require.NoError(b, err) - } - }) -} diff --git a/internal/query/session.go b/internal/query/session.go index dce1cc130..e020c3e92 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -36,7 +36,13 @@ func (s *Session) QueryResultSet( onDone(finalErr) }() - r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + settings := options.ExecuteSettings(opts...) + + if settings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + + r, err := s.execute(ctx, q, settings, withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -75,7 +81,13 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut onDone(finalErr) }() - row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + settings := options.ExecuteSettings(opts...) + + if settings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + + row, err := s.queryRow(ctx, q, settings, withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -164,6 +176,10 @@ func (s *Session) Exec(ctx context.Context, q string, opts ...options.Execute) ( onDone(finalErr) }() + if settings.IsImplicitSession() { + return xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return xerrors.WithStackTrace(err) @@ -192,6 +208,10 @@ func (s *Session) Query(ctx context.Context, q string, opts ...options.Execute) onDone(finalErr) }() + if settings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index 3f7f65389..f50bd9393 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -86,6 +86,10 @@ func (tx *Transaction) QueryResultSet( onDone(finalErr) }() + if txSettings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + if tx.completed { return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx) } @@ -139,6 +143,10 @@ func (tx *Transaction) QueryRow( onDone(finalErr) }() + if txSettings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + resultOpts := []resultOption{ withTrace(tx.s.trace), onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { @@ -203,6 +211,10 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu onDone(finalErr) }() + if txSettings.IsImplicitSession() { + return xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + if tx.completed { return xerrors.WithStackTrace(errExecuteOnCompletedTx) } @@ -276,6 +288,10 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec onDone(finalErr) }() + if txSettings.IsImplicitSession() { + return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) + } + if tx.completed { return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx) } diff --git a/options.go b/options.go index 0d8b926e6..15c083eae 100644 --- a/options.go +++ b/options.go @@ -545,15 +545,6 @@ func WithSessionPoolSizeLimit(sizeLimit int) Option { } } -// WithImplicitSessionPoolSizeLimit set max size of implicit sessions pool in query.Client -func WithImplicitSessionPoolSizeLimit(sizeLimit int) Option { - return func(ctx context.Context, d *Driver) error { - d.queryOptions = append(d.queryOptions, queryConfig.WithImplicitSessionPoolLimit(sizeLimit)) - - return nil - } -} - // WithSessionPoolSessionUsageLimit set pool session max usage: // - if argument type is uint64 - WithSessionPoolSessionUsageLimit limits max usage count of pool session // - if argument type is time.Duration - WithSessionPoolSessionUsageLimit limits max time to live of pool session diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/implicit_session_bench_test.go index 6cae8655f..87b11715f 100644 --- a/tests/integration/implicit_session_bench_test.go +++ b/tests/integration/implicit_session_bench_test.go @@ -7,11 +7,12 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/query" ) -func BenchmarkImplicitSessions(b *testing.B) { +func BenchmarkQuery_Query_WithImplicitSession(b *testing.B) { ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { @@ -22,19 +23,37 @@ func BenchmarkImplicitSessions(b *testing.B) { q := db.Query() // Warmup - q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) - b.Run("implicit", func(b *testing.B) { - for range b.N { - q.Query(ctx, `SELECT 42 as id, "my string" as myStr`, + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`, query.WithImplicitSession(), ) + require.NoError(b, err) } }) +} + +func BenchmarkQuery_Query(b *testing.B) { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local") + if err != nil { + panic(err) + } + defer db.Close(ctx) // cleanup resources + + q := db.Query() + + // Warmup + _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) - b.Run("explicit", func(b *testing.B) { - for range b.N { - q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) } }) } diff --git a/testutil/driver.go b/testutil/driver.go index 74968de1e..672bed768 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -65,6 +65,8 @@ const ( QueryCreateSession QueryExecuteQuery QueryAttachSession + QueryBeginTransaction + QueryCommitTransaction ) var grpcMethodToCode = map[Method]MethodCode{ @@ -87,9 +89,11 @@ var grpcMethodToCode = map[Method]MethodCode{ "/Ydb.Table.V1.TableService/StreamReadTable": TableStreamReadTable, "/Ydb.Table.V1.TableService/StreamExecuteScanQuery": TableStreamExecuteScanQuery, - "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, - "/Ydb.Query.V1.QueryService/CreateSession": QueryCreateSession, - "/Ydb.Query.V1.QueryService/AttachSession": QueryAttachSession, + "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, + "/Ydb.Query.V1.QueryService/CreateSession": QueryCreateSession, + "/Ydb.Query.V1.QueryService/AttachSession": QueryAttachSession, + "/Ydb.Query.V1.QueryService/BeginTransaction": QueryBeginTransaction, + "/Ydb.Query.V1.QueryService/CommitTransaction": QueryCommitTransaction, } var codeToString = map[MethodCode]string{ From 4334b26951472ac431bff5ea82c62927f14cc4c2 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Mon, 7 Jul 2025 17:12:57 +0300 Subject: [PATCH 03/18] fixes from Code Review --- internal/query/client_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 3563f2a22..f44a4791f 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -175,7 +175,7 @@ func TestClient(t *testing.T) { }) t.Run("WithImplicitSession", func(t *testing.T) { - mockClientTransactionalForImplicitSessionTest(ctx). + err := mockClientTransactionalForImplicitSessionTest(ctx). Do(ctx, func(ctx context.Context, s query.Session) error { err := s.Exec(ctx, "SELECT 1", query.WithImplicitSession()) require.ErrorContains(t, err, "implicit sessions are not supported") @@ -191,6 +191,7 @@ func TestClient(t *testing.T) { return nil }) + require.NoError(t, err) }) }) t.Run("DoTx", func(t *testing.T) { From 5f7a7d19f6fe8853dbc055080b4450cff48bc008 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Mon, 7 Jul 2025 18:01:56 +0300 Subject: [PATCH 04/18] fixes from Code Review --- internal/query/config/options.go | 3 +++ internal/query/options/execute.go | 4 ++++ internal/query/options/retry.go | 4 ---- query/execute_options.go | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/query/config/options.go b/internal/query/config/options.go index 3553c11ac..ad7bda207 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -23,6 +23,9 @@ func WithTrace(trace *trace.Query, opts ...trace.QueryComposeOption) Option { } } +// WithPoolLimit defines upper bound of pooled sessions. +// If poolLimit is less than or equal to zero then the +// DefaultPoolMaxSize variable is used as a poolLimit. func WithPoolLimit(size int) Option { return func(c *Config) { if size > 0 { diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index d6144106b..8920b20f6 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -230,6 +230,10 @@ func WithExecMode(mode ExecMode) execModeOption { return mode } +func WithImplicitSession() implicitSessionOption { + return implicitSessionOption{} +} + func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes { return responsePartLimitBytes(size) } diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index e0f5e2673..1ce69ff08 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -109,10 +109,6 @@ func WithIdempotent() RetryOptionsOption { return []retry.Option{retry.WithIdempotent(true)} } -func WithImplicitSession() implicitSessionOption { - return implicitSessionOption{} -} - func WithLabel(lbl string) LabelOption { return LabelOption(lbl) } diff --git a/query/execute_options.go b/query/execute_options.go index d5c35a298..b3433e753 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -50,7 +50,8 @@ func WithExecMode(mode options.ExecMode) ExecuteOption { } // WithImplicitSession is an option to execute a query using an implicit session -// which allows the query to be executed without explicitly creating a session +// which allows the query to be executed without explicitly creating a session. +// Please note that requests with this option use a separate session pool. func WithImplicitSession() ExecuteOption { return options.WithImplicitSession() } From 8240f207ea31ab107d04658153b7fd4054c045bc Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Mon, 7 Jul 2025 18:14:58 +0300 Subject: [PATCH 05/18] fixes from Code Review --- tests/integration/implicit_session_bench_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/implicit_session_bench_test.go index 87b11715f..d01671e7b 100644 --- a/tests/integration/implicit_session_bench_test.go +++ b/tests/integration/implicit_session_bench_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/query" ) From 41a20e9e1ef804e3a8a5ba135d562c6063824656 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Mon, 7 Jul 2025 19:27:39 +0300 Subject: [PATCH 06/18] fixes from Code Review --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1dd828fa3..1e51df45b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,7 +19,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.22.x, 1.24.x] + go-version: [1.21.x, 1.24.x] os: [ubuntu, windows, macOS] env: OS: ${{ matrix.os }}-latest @@ -52,7 +52,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.22.x, 1.24.x] + go-version: [1.21.x, 1.24.x] ydb-version: [latest, 24.4, 25.1] os: [ubuntu] services: From 5fc898c48903400f94dd6ad295b371f2f9609251 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Wed, 9 Jul 2025 16:28:10 +0300 Subject: [PATCH 07/18] fixes from Code Review --- CHANGELOG.md | 3 +- internal/query/client.go | 8 +-- internal/query/client_test.go | 86 ++++--------------------------- internal/query/config/config.go | 6 +++ internal/query/config/options.go | 9 ++++ internal/query/errors.go | 23 ++++----- internal/query/execute_query.go | 1 - internal/query/options/execute.go | 14 ----- internal/query/session.go | 24 +-------- internal/query/transaction.go | 16 ------ query/execute_options.go | 7 --- 11 files changed, 42 insertions(+), 155 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 355c9289d..90894806d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,5 @@ * Added a feature: "Execute queries without explicit session creation": - * added `ydb.WithImplicitSessionPoolSizeLimit()` option for limitation of implicit sessions - * added `query.WithImplicitSession()` option for execute query with implicit session + * added `config.WithImplicitSessions` query option for execute queries with implicit sessions ## v3.112.0 * Added support for the `json.Unmarshaler` interface in the `CastTo` function for use in scanners, such as the `ScanStruct` method diff --git a/internal/query/client.go b/internal/query/client.go index a7c2e411b..6b9a9f523 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -334,7 +334,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute }() pool := c.pool - if settings.IsImplicitSession() { + if c.config.IsImplicitSession() { pool = c.implicitSessionPool } @@ -386,7 +386,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f }() pool := c.pool - if settings.IsImplicitSession() { + if c.config.IsImplicitSession() { pool = c.implicitSessionPool } @@ -439,7 +439,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) ( }() pool := c.pool - if settings.IsImplicitSession() { + if c.config.IsImplicitSession() { pool = c.implicitSessionPool } @@ -499,7 +499,7 @@ func (c *Client) QueryResultSet( }() pool := c.pool - if settings.IsImplicitSession() { + if c.config.IsImplicitSession() { pool = c.implicitSessionPool } diff --git a/internal/query/client_test.go b/internal/query/client_test.go index f44a4791f..f9a01faeb 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" @@ -173,26 +172,6 @@ func TestClient(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, counter) }) - - t.Run("WithImplicitSession", func(t *testing.T) { - err := mockClientTransactionalForImplicitSessionTest(ctx). - Do(ctx, func(ctx context.Context, s query.Session) error { - err := s.Exec(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = s.Query(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = s.QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = s.QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - return nil - }) - require.NoError(t, err) - }) }) t.Run("DoTx", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -695,27 +674,6 @@ func TestClient(t *testing.T) { }) }) }) - - t.Run("WithImplicitSession", func(t *testing.T) { - err := mockClientTransactionalForImplicitSessionTest(ctx). - DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { - err := tx.Exec(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = tx.Query(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = tx.QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - _, err = tx.QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) - require.ErrorContains(t, err, "implicit sessions are not supported") - - return nil - }) - - require.NoError(t, err) // no transaction errors - }) }) t.Run("Exec", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -889,7 +847,7 @@ func TestClient(t *testing.T) { t.Run("WithImplicitSession", func(t *testing.T) { err := mockClientForImplicitSessionTest(ctx). - Exec(ctx, "SELECT 1", query.WithImplicitSession()) + Exec(ctx, "SELECT 1") require.NoError(t, err) }) @@ -1132,7 +1090,7 @@ func TestClient(t *testing.T) { }) t.Run("WithImplicitSession", func(t *testing.T) { _, err := mockClientForImplicitSessionTest(ctx). - Query(ctx, "SELECT 1", query.WithImplicitSession()) + Query(ctx, "SELECT 1") require.NoError(t, err) }) @@ -1456,7 +1414,7 @@ func TestClient(t *testing.T) { }) t.Run("WithImplicitSession", func(t *testing.T) { _, err := mockClientForImplicitSessionTest(ctx). - QueryResultSet(ctx, "SELECT 1", query.WithImplicitSession()) + QueryResultSet(ctx, "SELECT 1") require.NoError(t, err) }) @@ -1603,7 +1561,7 @@ func TestClient(t *testing.T) { t.Run("WithImplicitSession", func(t *testing.T) { _, err := mockClientForImplicitSessionTest(ctx). - QueryRow(ctx, "SELECT 1", query.WithImplicitSession()) + QueryRow(ctx, "SELECT 1") require.NoError(t, err) }) @@ -1614,45 +1572,19 @@ func TestClient(t *testing.T) { // for simulating implicit session scenarios in query client testing. It configures // the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. func mockClientForImplicitSessionTest(ctx context.Context) *Client { - balancer := testutil.NewBalancer( - testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ - testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return testutil.MockClientStream(), nil - }, - }), - ) - - return New(ctx, balancer, config.New()) + return New(ctx, mockConnForImplicitSessionTest(), config.New( + config.WithImplicitSessions(), + )) } -// mockClientTransactionalForImplicitSessionTest creates a new Client with a test balancer -// for simulating transactional implicit session scenarios in query client testing. -// It configures the mock to return successful responses for session creation and attachment. -// But returns error if real Qeury was sended. -func mockClientTransactionalForImplicitSessionTest(ctx context.Context) *Client { - balancer := testutil.NewBalancer( - testutil.WithInvokeHandlers(testutil.InvokeHandlers{ - testutil.QueryCreateSession: func(any) (proto.Message, error) { - return &Ydb_Query.CreateSessionResponse{}, nil - }, - testutil.QueryBeginTransaction: func(any) (proto.Message, error) { - return &Ydb_Query.BeginTransactionResponse{}, nil - }, - testutil.QueryCommitTransaction: func(any) (proto.Message, error) { - return &Ydb_Query.CommitTransactionResponse{}, nil - }, - }), +func mockConnForImplicitSessionTest() grpc.ClientConnInterface { + return testutil.NewBalancer( testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return nil, errors.New("ExecuteQuery should not be called for transactional implicit session") - }, - testutil.QueryAttachSession: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { return testutil.MockClientStream(), nil }, }), ) - - return New(ctx, balancer, config.New()) } type sessionControllerMock struct { diff --git a/internal/query/config/config.go b/internal/query/config/config.go index 8eb221f38..b281eb734 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -25,6 +25,8 @@ type Config struct { sessionDeleteTimeout time.Duration sessionIddleTimeToLive time.Duration + implicitSession bool + lazyTx bool trace *trace.Query @@ -62,6 +64,10 @@ func (c *Config) PoolLimit() int { return c.poolLimit } +func (c *Config) IsImplicitSession() bool { + return c.implicitSession +} + func (c *Config) PoolSessionUsageLimit() uint64 { return c.poolSessionUsageLimit } diff --git a/internal/query/config/options.go b/internal/query/config/options.go index ad7bda207..156a09dae 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -80,6 +80,15 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option { } } +// WithImplicitSessions is an option to execute queries using an implicit session +// which allows the queries to be executed without explicitly creating a session. +// Please note that requests with this option use a separate session pool. +func WithImplicitSessions() Option { + return func(c *Config) { + c.implicitSession = true + } +} + func WithLazyTx(lazyTx bool) Option { return func(c *Config) { c.lazyTx = lazyTx diff --git a/internal/query/errors.go b/internal/query/errors.go index b52feb1a1..3a21083d3 100644 --- a/internal/query/errors.go +++ b/internal/query/errors.go @@ -7,16 +7,15 @@ import ( ) var ( - errNilClient = xerrors.Wrap(errors.New("table client is not initialized")) - ErrTransactionRollingBack = xerrors.Wrap(errors.New("the transaction is rolling back")) - errWrongNextResultSetIndex = errors.New("wrong result set index") - errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") - errMoreThanOneRow = errors.New("unexpected more than one row in result set") - errMoreThanOneResultSet = errors.New("unexpected more than one result set") - errNoResultSets = errors.New("no result sets") - errNilOption = errors.New("nil option") - ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction") - errExecuteOnCompletedTx = errors.New("execute on completed transaction") - errSessionClosed = errors.New("session is closed") - errImplicitSessionsNotSupported = errors.New("implicit sessions are not supported") + errNilClient = xerrors.Wrap(errors.New("table client is not initialized")) + ErrTransactionRollingBack = xerrors.Wrap(errors.New("the transaction is rolling back")) + errWrongNextResultSetIndex = errors.New("wrong result set index") + errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") + errMoreThanOneRow = errors.New("unexpected more than one row in result set") + errMoreThanOneResultSet = errors.New("unexpected more than one result set") + errNoResultSets = errors.New("no result sets") + errNilOption = errors.New("nil option") + ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction") + errExecuteOnCompletedTx = errors.New("execute on completed transaction") + errSessionClosed = errors.New("session is closed") ) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 7fe8799bb..71c408c7b 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -32,7 +32,6 @@ type executeSettings interface { ResourcePool() string ResponsePartLimitSizeBytes() int64 Label() string - IsImplicitSession() bool } type executeScriptConfig interface { diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index 8920b20f6..cb60336ae 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -44,7 +44,6 @@ type ( retryOptions []retry.Option responsePartLimitBytes int64 label string - isImplicitSession bool } // Execute is an interface for execute method options @@ -71,7 +70,6 @@ type ( } execModeOption = ExecMode responsePartLimitBytes int64 - implicitSessionOption struct{} ) func (poolID resourcePool) applyExecuteOption(s *executeSettings) { @@ -194,14 +192,6 @@ func (s *executeSettings) Label() string { return s.label } -func (s *executeSettings) IsImplicitSession() bool { - return s.isImplicitSession -} - -func (implicitSessionOption) applyExecuteOption(s *executeSettings) { - s.isImplicitSession = true -} - func WithParameters(params params.Parameters) parametersOption { return parametersOption{ params: params, @@ -230,10 +220,6 @@ func WithExecMode(mode ExecMode) execModeOption { return mode } -func WithImplicitSession() implicitSessionOption { - return implicitSessionOption{} -} - func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes { return responsePartLimitBytes(size) } diff --git a/internal/query/session.go b/internal/query/session.go index e020c3e92..dce1cc130 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -36,13 +36,7 @@ func (s *Session) QueryResultSet( onDone(finalErr) }() - settings := options.ExecuteSettings(opts...) - - if settings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - - r, err := s.execute(ctx, q, settings, withTrace(s.trace)) + r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -81,13 +75,7 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut onDone(finalErr) }() - settings := options.ExecuteSettings(opts...) - - if settings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - - row, err := s.queryRow(ctx, q, settings, withTrace(s.trace)) + row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -176,10 +164,6 @@ func (s *Session) Exec(ctx context.Context, q string, opts ...options.Execute) ( onDone(finalErr) }() - if settings.IsImplicitSession() { - return xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return xerrors.WithStackTrace(err) @@ -208,10 +192,6 @@ func (s *Session) Query(ctx context.Context, q string, opts ...options.Execute) onDone(finalErr) }() - if settings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - r, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index f50bd9393..3f7f65389 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -86,10 +86,6 @@ func (tx *Transaction) QueryResultSet( onDone(finalErr) }() - if txSettings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - if tx.completed { return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx) } @@ -143,10 +139,6 @@ func (tx *Transaction) QueryRow( onDone(finalErr) }() - if txSettings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - resultOpts := []resultOption{ withTrace(tx.s.trace), onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { @@ -211,10 +203,6 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu onDone(finalErr) }() - if txSettings.IsImplicitSession() { - return xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - if tx.completed { return xerrors.WithStackTrace(errExecuteOnCompletedTx) } @@ -288,10 +276,6 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec onDone(finalErr) }() - if txSettings.IsImplicitSession() { - return nil, xerrors.WithStackTrace(errImplicitSessionsNotSupported) - } - if tx.completed { return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx) } diff --git a/query/execute_options.go b/query/execute_options.go index b3433e753..784189e9c 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -49,13 +49,6 @@ func WithExecMode(mode options.ExecMode) ExecuteOption { return options.WithExecMode(mode) } -// WithImplicitSession is an option to execute a query using an implicit session -// which allows the query to be executed without explicitly creating a session. -// Please note that requests with this option use a separate session pool. -func WithImplicitSession() ExecuteOption { - return options.WithImplicitSession() -} - func WithSyntax(syntax options.Syntax) ExecuteOption { return options.WithSyntax(syntax) } From feb5d87c8da3656ec0844870dd54b9b37c861333 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Wed, 9 Jul 2025 17:45:07 +0300 Subject: [PATCH 08/18] added benchmarks --- .../implicit_session_bench_test.go | 101 +++++++++++------- 1 file changed, 65 insertions(+), 36 deletions(-) diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/implicit_session_bench_test.go index d01671e7b..9c4aaf994 100644 --- a/tests/integration/implicit_session_bench_test.go +++ b/tests/integration/implicit_session_bench_test.go @@ -5,56 +5,85 @@ package integration import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-sdk/v3" - "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" ) +// BenchmarkQuery_Query_WithImplicitSession +// Result: +// goos: darwin +// goarch: arm64 +// pkg: github.com/ydb-platform/ydb-go-sdk/v3/tests/integration +// cpu: Apple M3 Pro +// BenchmarkQuery_Query_WithImplicitSession/parallel-1-12 7020 193954 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-2-12 7903 173707 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-16-12 7006 156601 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-512-12 6811 165773 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-2048-12 8218 163119 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-16384-12 7093 170583 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-65536-12 6410 176477 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-131072-12 5841 179243 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-262144-12 5552 203478 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-393216-12 3854 274290 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-524288-12 4 251855177 ns/op +// BenchmarkQuery_Query_WithImplicitSession/parallel-1048576-12 1 1902566958 ns/op func BenchmarkQuery_Query_WithImplicitSession(b *testing.B) { - ctx := context.TODO() - db, err := ydb.Open(ctx, "grpc://localhost:2136/local") - if err != nil { - panic(err) - } - defer db.Close(ctx) // cleanup resources + benchOverQueryService(context.TODO(), b, + ydb.WithQueryConfigOption(config.WithImplicitSessions()), + ) +} - q := db.Query() +// BenchmarkQuery_Query +// Result: +// goos: darwin +// goarch: arm64 +// pkg: github.com/ydb-platform/ydb-go-sdk/v3/tests/integration +// cpu: Apple M3 Pro +// BenchmarkQuery_Query/parallel-1-12 9445 128672 ns/op +// BenchmarkQuery_Query/parallel-2-12 12777 100227 ns/op +// BenchmarkQuery_Query/parallel-16-12 13782 87532 ns/op +// BenchmarkQuery_Query/parallel-512-12 12950 92284 ns/op +// BenchmarkQuery_Query/parallel-2048-12 12091 96659 ns/op +// BenchmarkQuery_Query/parallel-16384-12 14293 94588 ns/op +// BenchmarkQuery_Query/parallel-65536-12 11144 96578 ns/op +// BenchmarkQuery_Query/parallel-131072-12 12848 103441 ns/op +// BenchmarkQuery_Query/parallel-262144-12 10414 120940 ns/op +// BenchmarkQuery_Query/parallel-393216-12 7090 154935 ns/op +// BenchmarkQuery_Query/parallel-524288-12 188 5381982 ns/op +// BenchmarkQuery_Query/parallel-1048576-12 1 1918443542 ns/op +func BenchmarkQuery_Query(b *testing.B) { + benchOverQueryService(context.TODO(), b) +} - // Warmup - _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) - require.NoError(b, err) +func benchOverQueryService(ctx context.Context, b *testing.B, driverOpts ...ydb.Option) { + goroutinesCnt := []int{1, 2, 16, 512, 2048, 16384, 65536, 131072, 262144, 393216, 524288, 1048576} - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`, - query.WithImplicitSession(), - ) - require.NoError(b, err) - } - }) -} + for _, parallelism := range goroutinesCnt { + b.Run(fmt.Sprintf("parallel-%d", parallelism), func(b *testing.B) { + b.SetParallelism(parallelism) -func BenchmarkQuery_Query(b *testing.B) { - ctx := context.TODO() - db, err := ydb.Open(ctx, "grpc://localhost:2136/local") - if err != nil { - panic(err) - } - defer db.Close(ctx) // cleanup resources + db, err := ydb.Open(ctx, "grpc://localhost:2136/local", driverOpts...) - q := db.Query() + require.NoError(b, err) + defer db.Close(ctx) - // Warmup - _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) - require.NoError(b, err) + q := db.Query() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + // Warmup + _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) require.NoError(b, err) - } - }) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) + } + }) + }) + } } From b951cd5188ac176689191ef3815c45ce5a6112c6 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Wed, 9 Jul 2025 17:53:00 +0300 Subject: [PATCH 09/18] fixes from Code Review --- testutil/driver.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/testutil/driver.go b/testutil/driver.go index 672bed768..42cc1f7cb 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -62,11 +62,7 @@ const ( TableStreamReadTable TableStreamExecuteScanQuery - QueryCreateSession QueryExecuteQuery - QueryAttachSession - QueryBeginTransaction - QueryCommitTransaction ) var grpcMethodToCode = map[Method]MethodCode{ @@ -89,11 +85,7 @@ var grpcMethodToCode = map[Method]MethodCode{ "/Ydb.Table.V1.TableService/StreamReadTable": TableStreamReadTable, "/Ydb.Table.V1.TableService/StreamExecuteScanQuery": TableStreamExecuteScanQuery, - "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, - "/Ydb.Query.V1.QueryService/CreateSession": QueryCreateSession, - "/Ydb.Query.V1.QueryService/AttachSession": QueryAttachSession, - "/Ydb.Query.V1.QueryService/BeginTransaction": QueryBeginTransaction, - "/Ydb.Query.V1.QueryService/CommitTransaction": QueryCommitTransaction, + "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, } var codeToString = map[MethodCode]string{ @@ -377,6 +369,9 @@ func (s *ClientStream) RecvMsg(m interface{}) error { return s.OnRecvMsg(m) } +// MockClientStream creates a mock ClientStream with predefined behavior for testing purposes. +// It simulates a client stream with a single message. +// The returned ClientStream can be used to mock gRPC stream interactions in unit tests. func MockClientStream() *ClientStream { var recvMsgAlreadySent bool From 6e888440f896d987b3e0cadbb234905309c1a1b6 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 14:10:25 +0300 Subject: [PATCH 10/18] fixes from code review --- CHANGELOG.md | 3 +- internal/query/client.go | 56 ++++++++----------- internal/query/client_test.go | 28 +++++++++- internal/query/config/config.go | 6 +- internal/query/config/options.go | 4 +- internal/query/session_core.go | 4 ++ query/client.go | 10 ++++ query/example_test.go | 47 ++++++++++++++++ .../implicit_session_bench_test.go | 4 +- testutil/driver.go | 28 ---------- 10 files changed, 118 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90894806d..3e4e478c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,4 @@ -* Added a feature: "Execute queries without explicit session creation": - * added `config.WithImplicitSessions` query option for execute queries with implicit sessions +* Added `query.AllowImplicitSessions()` option for execute queries through `query.Client.{Exec,Query,QueryResultSet,QueryRow}` without explicit sessions ## v3.112.0 * Added support for the `json.Unmarshaler` interface in the `CastTo` function for use in scanners, such as the `ScanStruct` method diff --git a/internal/query/client.go b/internal/query/client.go index 6b9a9f523..1c143b758 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -41,9 +41,9 @@ type ( With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error } Client struct { - config *config.Config - client Ydb_Query_V1.QueryServiceClient - pool sessionPool + config *config.Config + client Ydb_Query_V1.QueryServiceClient + explicitSessionPool sessionPool // implicitSessionPool is a pool of implicit sessions, // i.e. fake sessions created without CreateSession/AttachSession requests. @@ -198,7 +198,7 @@ func (c *Client) Close(ctx context.Context) error { close(c.done) - if err := c.pool.Close(ctx); err != nil { + if err := c.explicitSessionPool.Close(ctx); err != nil { return xerrors.WithStackTrace(err) } @@ -246,7 +246,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO onDone(attempts, finalErr) }() - err := do(ctx, c.pool, + err := do(ctx, c.explicitSessionPool, func(ctx context.Context, s *Session) error { return op(ctx, s) }, @@ -333,12 +333,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute onDone(finalErr) }() - pool := c.pool - if c.config.IsImplicitSession() { - pool = c.implicitSessionPool - } - - row, err := clientQueryRow(ctx, pool, q, settings, withTrace(c.config.Trace())) + row, err := clientQueryRow(ctx, c.pool(), q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -385,12 +380,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f onDone(finalErr) }() - pool := c.pool - if c.config.IsImplicitSession() { - pool = c.implicitSessionPool - } - - err := clientExec(ctx, pool, q, opts...) + err := clientExec(ctx, c.pool(), q, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -438,12 +428,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) ( onDone(err) }() - pool := c.pool - if c.config.IsImplicitSession() { - pool = c.implicitSessionPool - } - - r, err = clientQuery(ctx, pool, q, opts...) + r, err = clientQuery(ctx, c.pool(), q, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -498,12 +483,7 @@ func (c *Client) QueryResultSet( onDone(finalErr, rowsCount) }() - pool := c.pool - if c.config.IsImplicitSession() { - pool = c.implicitSessionPool - } - - rs, rowsCount, err = clientQueryResultSet(ctx, pool, q, settings, withTrace(c.config.Trace())) + rs, rowsCount, err = clientQueryResultSet(ctx, c.pool(), q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -511,6 +491,17 @@ func (c *Client) QueryResultSet( return rs, nil } +// pool returns the appropriate session pool based on the client configuration. +// If implicit sessions are enabled, it returns the implicit session pool; +// otherwise, it returns the explicit session pool. +func (c *Client) pool() sessionPool { + if c.config.AllowImplicitSessions() { + return c.implicitSessionPool + } + + return c.explicitSessionPool +} + func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (finalErr error) { ctx, cancel := xcontext.WithDone(ctx, c.done) defer cancel() @@ -527,7 +518,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options onDone(attempts, finalErr) }() - err := doTx(ctx, c.pool, op, + err := doTx(ctx, c.explicitSessionPool, op, settings.TxSettings(), append( []retry.Option{ @@ -594,7 +585,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * client: client, done: make(chan struct{}), implicitSessionPool: createImplicitSessionPool(ctx, cfg, client, cc), - pool: pool.New(ctx, + explicitSessionPool: pool.New(ctx, pool.WithLimit[*Session](cfg.PoolLimit()), pool.WithItemUsageLimit[*Session](cfg.PoolSessionUsageLimit()), pool.WithItemUsageTTL[*Session](cfg.PoolSessionUsageTTL()), @@ -649,6 +640,7 @@ func createImplicitSessionPool(ctx context.Context, ) sessionPool { return pool.New(ctx, pool.WithLimit[*Session](cfg.PoolLimit()), + pool.WithTrace[*Session](poolTrace(cfg.Trace())), pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { core := &sessionCore{ cc: cc, @@ -657,8 +649,6 @@ func createImplicitSessionPool(ctx context.Context, done: make(chan struct{}), } - core.SetStatus(StatusIdle) - return &Session{ Core: core, trace: cfg.Trace(), diff --git a/internal/query/client_test.go b/internal/query/client_test.go index f9a01faeb..6a443ca63 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -1573,7 +1573,7 @@ func TestClient(t *testing.T) { // the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. func mockClientForImplicitSessionTest(ctx context.Context) *Client { return New(ctx, mockConnForImplicitSessionTest(), config.New( - config.WithImplicitSessions(), + config.AllowImplicitSessions(), )) } @@ -1581,12 +1581,36 @@ func mockConnForImplicitSessionTest() grpc.ClientConnInterface { return testutil.NewBalancer( testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return testutil.MockClientStream(), nil + return mockClientStream(), nil }, }), ) } +// mockClientStream creates a mock ClientStream. +// It simulates a client stream with a single message. +func mockClientStream() *testutil.ClientStream { + var recvMsgAlreadySent bool + + return &testutil.ClientStream{ + OnSendMsg: func(m any) error { return nil }, + OnCloseSend: func() error { return nil }, + OnRecvMsg: func(m any) error { + if recvMsgAlreadySent { + return io.EOF + } + recvMsgAlreadySent = true + + switch resp := m.(type) { + case *Ydb_Query.ExecuteQueryResponsePart: + resp.ResultSet = &Ydb.ResultSet{Rows: []*Ydb.Value{{}}} + } + + return nil + }, + } +} + type sessionControllerMock struct { id string status Status diff --git a/internal/query/config/config.go b/internal/query/config/config.go index b281eb734..9dc05f300 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -25,7 +25,7 @@ type Config struct { sessionDeleteTimeout time.Duration sessionIddleTimeToLive time.Duration - implicitSession bool + allowImplicitSessions bool lazyTx bool @@ -64,8 +64,8 @@ func (c *Config) PoolLimit() int { return c.poolLimit } -func (c *Config) IsImplicitSession() bool { - return c.implicitSession +func (c *Config) AllowImplicitSessions() bool { + return c.allowImplicitSessions } func (c *Config) PoolSessionUsageLimit() uint64 { diff --git a/internal/query/config/options.go b/internal/query/config/options.go index 156a09dae..d69e3c2a4 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -83,9 +83,9 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option { // WithImplicitSessions is an option to execute queries using an implicit session // which allows the queries to be executed without explicitly creating a session. // Please note that requests with this option use a separate session pool. -func WithImplicitSessions() Option { +func AllowImplicitSessions() Option { return func(c *Config) { - c.implicitSession = true + c.allowImplicitSessions = true } } diff --git a/internal/query/session_core.go b/internal/query/session_core.go index c7652d3c6..d4e322012 100644 --- a/internal/query/session_core.go +++ b/internal/query/session_core.go @@ -260,6 +260,10 @@ func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) { return xerrors.WithStackTrace(err) } + if core.id == "" { + return nil + } + _, err := core.Client.DeleteSession(ctx, &Ydb_Query.DeleteSessionRequest{ SessionId: core.id, diff --git a/query/client.go b/query/client.go index 64eb42689..d2f4dd042 100644 --- a/query/client.go +++ b/query/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -152,3 +153,12 @@ func WithLabel(lbl string) options.LabelOption { func WithRetryBudget(b budget.Budget) options.RetryOptionsOption { return options.WithRetryBudget(b) } + +// WithImplicitSessions is an option to execute queries using an implicit session +// which allows the queries to be executed without explicitly creating a session. +// Please note that requests with this option use a separate session pool. +// +// Working with `query.Client.{Exec,Query,QueryResultSet,QueryRow}`. +func AllowImplicitSessions() config.Option { + return config.AllowImplicitSessions() +} diff --git a/query/example_test.go b/query/example_test.go index ad58dfe6f..73fecfff3 100644 --- a/query/example_test.go +++ b/query/example_test.go @@ -533,3 +533,50 @@ func Example_executeScript() { } } } + +func ExampleAllowImplicitSessions() { + ctx := context.TODO() + + implicitSessionOpt := ydb.WithQueryConfigOption(query.AllowImplicitSessions()) + + db, err := ydb.Open(ctx, "grpc://localhost:2136/local", implicitSessionOpt) + if err != nil { + panic(err) + } + + defer db.Close(ctx) + + // Executes without sending `CreateSession` and `AttachSession` requests + res, err := db.Query().Query(ctx, `SELECT 42 as id, "my string" as myStr`) + if err != nil { + panic(err) + } + defer res.Close(ctx) + + for rs, err := range res.ResultSets(ctx) { + if err != nil { + panic(err) + } + + for row, err := range rs.Rows(ctx) { + if err != nil { + panic(err) + } + + var ( + id int64 + myStr string + ) + + err = row.Scan(&id, &myStr) + if err != nil { + panic(err) + } + + fmt.Printf("id=%v, myStr='%s'\n", id, myStr) + } + } + + // Output: + // id=42, myStr='my string' +} diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/implicit_session_bench_test.go index 9c4aaf994..0139769e6 100644 --- a/tests/integration/implicit_session_bench_test.go +++ b/tests/integration/implicit_session_bench_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-sdk/v3" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" + "github.com/ydb-platform/ydb-go-sdk/v3/query" ) // BenchmarkQuery_Query_WithImplicitSession @@ -34,7 +34,7 @@ import ( // BenchmarkQuery_Query_WithImplicitSession/parallel-1048576-12 1 1902566958 ns/op func BenchmarkQuery_Query_WithImplicitSession(b *testing.B) { benchOverQueryService(context.TODO(), b, - ydb.WithQueryConfigOption(config.WithImplicitSessions()), + ydb.WithQueryConfigOption(query.AllowImplicitSessions()), ) } diff --git a/testutil/driver.go b/testutil/driver.go index 42cc1f7cb..7b4b15320 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -3,13 +3,10 @@ package testutil import ( "context" "fmt" - "io" "reflect" "strings" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -369,31 +366,6 @@ func (s *ClientStream) RecvMsg(m interface{}) error { return s.OnRecvMsg(m) } -// MockClientStream creates a mock ClientStream with predefined behavior for testing purposes. -// It simulates a client stream with a single message. -// The returned ClientStream can be used to mock gRPC stream interactions in unit tests. -func MockClientStream() *ClientStream { - var recvMsgAlreadySent bool - - return &ClientStream{ - OnSendMsg: func(m any) error { return nil }, - OnCloseSend: func() error { return nil }, - OnRecvMsg: func(m any) error { - if recvMsgAlreadySent { - return io.EOF - } - recvMsgAlreadySent = true - - switch resp := m.(type) { // you can freely add additional mock data - case *Ydb_Query.ExecuteQueryResponsePart: - resp.ResultSet = &Ydb.ResultSet{Rows: []*Ydb.Value{{}}} - } - - return nil - }, - } -} - func lastSegment(m string) string { s := strings.Split(m, "/") From 374c0ba32804530e1073a8b6d1e32e97e1ffef8a Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 14:12:24 +0300 Subject: [PATCH 11/18] fixes from code review --- internal/query/session_core.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/query/session_core.go b/internal/query/session_core.go index d4e322012..c7652d3c6 100644 --- a/internal/query/session_core.go +++ b/internal/query/session_core.go @@ -260,10 +260,6 @@ func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) { return xerrors.WithStackTrace(err) } - if core.id == "" { - return nil - } - _, err := core.Client.DeleteSession(ctx, &Ydb_Query.DeleteSessionRequest{ SessionId: core.id, From c3430c3c29fbbc5e473116412102d8c0258f7e57 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 14:31:08 +0300 Subject: [PATCH 12/18] fixes from code review --- query/example_test.go | 47 ------------------------------------------- 1 file changed, 47 deletions(-) diff --git a/query/example_test.go b/query/example_test.go index 73fecfff3..ad58dfe6f 100644 --- a/query/example_test.go +++ b/query/example_test.go @@ -533,50 +533,3 @@ func Example_executeScript() { } } } - -func ExampleAllowImplicitSessions() { - ctx := context.TODO() - - implicitSessionOpt := ydb.WithQueryConfigOption(query.AllowImplicitSessions()) - - db, err := ydb.Open(ctx, "grpc://localhost:2136/local", implicitSessionOpt) - if err != nil { - panic(err) - } - - defer db.Close(ctx) - - // Executes without sending `CreateSession` and `AttachSession` requests - res, err := db.Query().Query(ctx, `SELECT 42 as id, "my string" as myStr`) - if err != nil { - panic(err) - } - defer res.Close(ctx) - - for rs, err := range res.ResultSets(ctx) { - if err != nil { - panic(err) - } - - for row, err := range rs.Rows(ctx) { - if err != nil { - panic(err) - } - - var ( - id int64 - myStr string - ) - - err = row.Scan(&id, &myStr) - if err != nil { - panic(err) - } - - fmt.Printf("id=%v, myStr='%s'\n", id, myStr) - } - } - - // Output: - // id=42, myStr='my string' -} From 7ea38090cd0cae07523e750e0952444819a261cc Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 17:50:13 +0300 Subject: [PATCH 13/18] fixes from code review --- internal/query/client.go | 62 ++++++++++++++++++++--------------- internal/query/client_test.go | 62 +++++++++++------------------------ testutil/driver.go | 9 +++-- 3 files changed, 58 insertions(+), 75 deletions(-) diff --git a/internal/query/client.go b/internal/query/client.go index 1c143b758..79289a157 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -573,13 +573,21 @@ func CreateSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, } func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client { + client := Ydb_Query_V1.NewQueryServiceClient(cc) + + return newWithQueryServiceClient(ctx, client, cc, cfg) +} + +func newWithQueryServiceClient(ctx context.Context, + client Ydb_Query_V1.QueryServiceClient, + cc grpc.ClientConnInterface, + cfg *config.Config, +) *Client { onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"), ) defer onDone() - client := Ydb_Query_V1.NewQueryServiceClient(cc) - return &Client{ config: cfg, client: client, @@ -633,31 +641,6 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * } } -func createImplicitSessionPool(ctx context.Context, - cfg *config.Config, - c Ydb_Query_V1.QueryServiceClient, - cc grpc.ClientConnInterface, -) sessionPool { - return pool.New(ctx, - pool.WithLimit[*Session](cfg.PoolLimit()), - pool.WithTrace[*Session](poolTrace(cfg.Trace())), - pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { - core := &sessionCore{ - cc: cc, - Client: c, - Trace: cfg.Trace(), - done: make(chan struct{}), - } - - return &Session{ - Core: core, - trace: cfg.Trace(), - client: c, - }, nil - }), - ) -} - func poolTrace(t *trace.Query) *pool.Trace { return &pool.Trace{ OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { @@ -707,3 +690,28 @@ func poolTrace(t *trace.Query) *pool.Trace { }, } } + +func createImplicitSessionPool(ctx context.Context, + cfg *config.Config, + c Ydb_Query_V1.QueryServiceClient, + cc grpc.ClientConnInterface, +) sessionPool { + return pool.New(ctx, + pool.WithLimit[*Session](cfg.PoolLimit()), + pool.WithTrace[*Session](poolTrace(cfg.Trace())), + pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { + core := &sessionCore{ + cc: cc, + Client: c, + Trace: cfg.Trace(), + done: make(chan struct{}), + } + + return &Session{ + Core: core, + trace: cfg.Trace(), + client: c, + }, nil + }), + ) +} diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 6a443ca63..e8a42a405 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -26,7 +26,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" - "github.com/ydb-platform/ydb-go-sdk/v3/testutil" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -845,8 +844,8 @@ func TestClient(t *testing.T) { require.NoError(t, err) }) - t.Run("WithImplicitSession", func(t *testing.T) { - err := mockClientForImplicitSessionTest(ctx). + t.Run("AllowImplicitSessions", func(t *testing.T) { + err := mockClientForImplicitSessionTest(ctx, t). Exec(ctx, "SELECT 1") require.NoError(t, err) @@ -1088,8 +1087,8 @@ func TestClient(t *testing.T) { require.Nil(t, r3) } }) - t.Run("WithImplicitSession", func(t *testing.T) { - _, err := mockClientForImplicitSessionTest(ctx). + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). Query(ctx, "SELECT 1") require.NoError(t, err) @@ -1412,8 +1411,8 @@ func TestClient(t *testing.T) { require.Nil(t, rs) require.Equal(t, 0, rowsCount) }) - t.Run("WithImplicitSession", func(t *testing.T) { - _, err := mockClientForImplicitSessionTest(ctx). + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). QueryResultSet(ctx, "SELECT 1") require.NoError(t, err) @@ -1559,8 +1558,8 @@ func TestClient(t *testing.T) { require.Nil(t, row) }) - t.Run("WithImplicitSession", func(t *testing.T) { - _, err := mockClientForImplicitSessionTest(ctx). + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). QueryRow(ctx, "SELECT 1") require.NoError(t, err) @@ -1571,44 +1570,21 @@ func TestClient(t *testing.T) { // mockClientForImplicitSessionTest creates a new Client with a test balancer // for simulating implicit session scenarios in query client testing. It configures // the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. -func mockClientForImplicitSessionTest(ctx context.Context) *Client { - return New(ctx, mockConnForImplicitSessionTest(), config.New( - config.AllowImplicitSessions(), - )) -} +func mockClientForImplicitSessionTest(ctx context.Context, t *testing.T) *Client { + ctrl := gomock.NewController(t) -func mockConnForImplicitSessionTest() grpc.ClientConnInterface { - return testutil.NewBalancer( - testutil.WithNewStreamHandlers(testutil.NewStreamHandlers{ - testutil.QueryExecuteQuery: func(desc *grpc.StreamDesc) (grpc.ClientStream, error) { - return mockClientStream(), nil - }, - }), - ) -} + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + ResultSet: &Ydb.ResultSet{Rows: []*Ydb.Value{{}}}, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) -// mockClientStream creates a mock ClientStream. -// It simulates a client stream with a single message. -func mockClientStream() *testutil.ClientStream { - var recvMsgAlreadySent bool - - return &testutil.ClientStream{ - OnSendMsg: func(m any) error { return nil }, - OnCloseSend: func() error { return nil }, - OnRecvMsg: func(m any) error { - if recvMsgAlreadySent { - return io.EOF - } - recvMsgAlreadySent = true + queryService := NewMockQueryServiceClient(ctrl) + queryService.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) - switch resp := m.(type) { - case *Ydb_Query.ExecuteQueryResponsePart: - resp.ResultSet = &Ydb.ResultSet{Rows: []*Ydb.Value{{}}} - } + cfg := config.New(config.AllowImplicitSessions()) - return nil - }, - } + return newWithQueryServiceClient(ctx, queryService, nil, cfg) } type sessionControllerMock struct { diff --git a/testutil/driver.go b/testutil/driver.go index 7b4b15320..d1ffd8e7a 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -58,8 +58,6 @@ const ( TableDescribeTableOptions TableStreamReadTable TableStreamExecuteScanQuery - - QueryExecuteQuery ) var grpcMethodToCode = map[Method]MethodCode{ @@ -81,8 +79,6 @@ var grpcMethodToCode = map[Method]MethodCode{ "/Ydb.Table.V1.TableService/DescribeTableOptions": TableDescribeTableOptions, "/Ydb.Table.V1.TableService/StreamReadTable": TableStreamReadTable, "/Ydb.Table.V1.TableService/StreamExecuteScanQuery": TableStreamExecuteScanQuery, - - "/Ydb.Query.V1.QueryService/ExecuteQuery": QueryExecuteQuery, } var codeToString = map[MethodCode]string{ @@ -111,7 +107,10 @@ func setField(name string, dst, value interface{}) { t := x.Type() f, ok := t.FieldByName(name) if !ok { - return + panic(fmt.Sprintf( + "struct %s has no field %q", + t, name, + )) } v := reflect.ValueOf(value) if f.Type.Kind() != v.Type().Kind() { From 7f28f19d5e6968a36d38106dd1d41cb0c9d81c40 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 19:03:56 +0300 Subject: [PATCH 14/18] fix traces --- internal/query/client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/query/client.go b/internal/query/client.go index 79289a157..30453ae7d 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -573,6 +573,11 @@ func CreateSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, } func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client { + onDone := trace.QueryOnNew(cfg.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"), + ) + defer onDone() + client := Ydb_Query_V1.NewQueryServiceClient(cc) return newWithQueryServiceClient(ctx, client, cc, cfg) @@ -583,11 +588,6 @@ func newWithQueryServiceClient(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config, ) *Client { - onDone := trace.QueryOnNew(cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"), - ) - defer onDone() - return &Client{ config: cfg, client: client, From ecee369d1f572fd9b71dff9f436cf0a75e7a0a5f Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 19:42:26 +0300 Subject: [PATCH 15/18] rename bench file --- ...on_bench_test.go => query_with_implicit_session_bench_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/integration/{implicit_session_bench_test.go => query_with_implicit_session_bench_test.go} (100%) diff --git a/tests/integration/implicit_session_bench_test.go b/tests/integration/query_with_implicit_session_bench_test.go similarity index 100% rename from tests/integration/implicit_session_bench_test.go rename to tests/integration/query_with_implicit_session_bench_test.go From 2cda4c05b7d5eb305302f1724dce2f566d8c62fa Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 10 Jul 2025 20:13:13 +0300 Subject: [PATCH 16/18] rename bench file --- internal/query/config/options.go | 3 -- query/client.go | 2 +- ...ery_allow_implicit_sessions_bench_test.go} | 28 +++++++++---------- 3 files changed, 15 insertions(+), 18 deletions(-) rename tests/integration/{query_with_implicit_session_bench_test.go => query_allow_implicit_sessions_bench_test.go} (65%) diff --git a/internal/query/config/options.go b/internal/query/config/options.go index d69e3c2a4..ed66d1845 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -80,9 +80,6 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option { } } -// WithImplicitSessions is an option to execute queries using an implicit session -// which allows the queries to be executed without explicitly creating a session. -// Please note that requests with this option use a separate session pool. func AllowImplicitSessions() Option { return func(c *Config) { c.allowImplicitSessions = true diff --git a/query/client.go b/query/client.go index d2f4dd042..661bc8276 100644 --- a/query/client.go +++ b/query/client.go @@ -154,7 +154,7 @@ func WithRetryBudget(b budget.Budget) options.RetryOptionsOption { return options.WithRetryBudget(b) } -// WithImplicitSessions is an option to execute queries using an implicit session +// AllowImplicitSessions is an option to execute queries using an implicit session // which allows the queries to be executed without explicitly creating a session. // Please note that requests with this option use a separate session pool. // diff --git a/tests/integration/query_with_implicit_session_bench_test.go b/tests/integration/query_allow_implicit_sessions_bench_test.go similarity index 65% rename from tests/integration/query_with_implicit_session_bench_test.go rename to tests/integration/query_allow_implicit_sessions_bench_test.go index 0139769e6..360bd38a3 100644 --- a/tests/integration/query_with_implicit_session_bench_test.go +++ b/tests/integration/query_allow_implicit_sessions_bench_test.go @@ -14,25 +14,25 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/query" ) -// BenchmarkQuery_Query_WithImplicitSession +// BenchmarkQuery_Query_AllowImplicitSessions // Result: // goos: darwin // goarch: arm64 // pkg: github.com/ydb-platform/ydb-go-sdk/v3/tests/integration // cpu: Apple M3 Pro -// BenchmarkQuery_Query_WithImplicitSession/parallel-1-12 7020 193954 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-2-12 7903 173707 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-16-12 7006 156601 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-512-12 6811 165773 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-2048-12 8218 163119 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-16384-12 7093 170583 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-65536-12 6410 176477 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-131072-12 5841 179243 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-262144-12 5552 203478 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-393216-12 3854 274290 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-524288-12 4 251855177 ns/op -// BenchmarkQuery_Query_WithImplicitSession/parallel-1048576-12 1 1902566958 ns/op -func BenchmarkQuery_Query_WithImplicitSession(b *testing.B) { +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-1-12 7020 193954 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-2-12 7903 173707 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-16-12 7006 156601 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-512-12 6811 165773 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-2048-12 8218 163119 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-16384-12 7093 170583 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-65536-12 6410 176477 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-131072-12 5841 179243 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-262144-12 5552 203478 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-393216-12 3854 274290 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-524288-12 4 251855177 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-1048576-12 1 1902566958 ns/op +func BenchmarkQuery_Query_AllowImplicitSessions(b *testing.B) { benchOverQueryService(context.TODO(), b, ydb.WithQueryConfigOption(query.AllowImplicitSessions()), ) From 83ae1459682698830639d72a81a55700d4f7401e Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 10 Jul 2025 21:26:52 +0300 Subject: [PATCH 17/18] Update internal/query/client.go --- internal/query/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/query/client.go b/internal/query/client.go index 30453ae7d..af358e940 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -202,6 +202,10 @@ func (c *Client) Close(ctx context.Context) error { return xerrors.WithStackTrace(err) } + if err := c.implicitSessionPool.Close(ctx); err != nil { + return xerrors.WithStackTrace(err) + } + return nil } From 0db3f152fd2e7be63faaaaba5c0aa2928f32363b Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Fri, 11 Jul 2025 14:35:36 +0300 Subject: [PATCH 18/18] fix panic --- internal/query/client_test.go | 12 ++++++++++++ internal/query/session_core.go | 4 +++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/query/client_test.go b/internal/query/client_test.go index e8a42a405..c54b78eed 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -1565,6 +1565,18 @@ func TestClient(t *testing.T) { require.NoError(t, err) }) }) + + t.Run("Close", func(t *testing.T) { + t.Run("AllowImplicitSessions", func(t *testing.T) { + client := mockClientForImplicitSessionTest(ctx, t) + _, err := client.QueryRow(ctx, "SELECT 1") + require.NoError(t, err) + + err = client.Close(context.Background()) + + require.NoError(t, err) + }) + }) } // mockClientForImplicitSessionTest creates a new Client with a test balancer diff --git a/internal/query/session_core.go b/internal/query/session_core.go index c7652d3c6..79394f08e 100644 --- a/internal/query/session_core.go +++ b/internal/query/session_core.go @@ -282,7 +282,9 @@ func (core *sessionCore) IsAlive() bool { } func (core *sessionCore) Close(ctx context.Context) (err error) { - defer core.closeOnce() + if core.closeOnce != nil { + defer core.closeOnce() + } select { case <-core.done: