diff --git a/CHANGELOG.md b/CHANGELOG.md index dec0985a4..3e4e478c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `query.AllowImplicitSessions()` option for execute queries through `query.Client.{Exec,Query,QueryResultSet,QueryRow}` without explicit sessions + ## v3.112.0 * Added support for the `json.Unmarshaler` interface in the `CastTo` function for use in scanners, such as the `ScanStruct` method * Fixed the support of server-side session balancing in `database/sql` driver diff --git a/VERSIONING.md b/VERSIONING.md index 40cb9171c..5054ad5b8 100644 --- a/VERSIONING.md +++ b/VERSIONING.md @@ -21,4 +21,5 @@ We making the following exceptions to those guidelines: - Some public API of `ydb-go-sdk` relate to the internals. - We use the `// Internals` comment for public internals in the `ydb-go-sdk`. - `ydb-go-sdk` internals can be changed at any time without increase of major part of version. - - Internals will never marked as stable + - Internals will never marked as stable + - `testutil` package can be changed at any time without increase of major part of version. diff --git a/internal/query/client.go b/internal/query/client.go index 05491c1f8..af358e940 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -41,9 +41,13 @@ type ( With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error } Client struct { - config *config.Config - client Ydb_Query_V1.QueryServiceClient - pool sessionPool + config *config.Config + client Ydb_Query_V1.QueryServiceClient + explicitSessionPool sessionPool + + // implicitSessionPool is a pool of implicit sessions, + // i.e. fake sessions created without CreateSession/AttachSession requests. + implicitSessionPool sessionPool done chan struct{} } @@ -194,7 +198,11 @@ func (c *Client) Close(ctx context.Context) error { close(c.done) - if err := c.pool.Close(ctx); err != nil { + if err := c.explicitSessionPool.Close(ctx); err != nil { + return xerrors.WithStackTrace(err) + } + + if err := c.implicitSessionPool.Close(ctx); err != nil { return xerrors.WithStackTrace(err) } @@ -242,7 +250,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO onDone(attempts, finalErr) }() - err := do(ctx, c.pool, + err := do(ctx, c.explicitSessionPool, func(ctx context.Context, s *Session) error { return op(ctx, s) }, @@ -329,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute onDone(finalErr) }() - row, err := clientQueryRow(ctx, c.pool, q, settings, withTrace(c.config.Trace())) + row, err := clientQueryRow(ctx, c.pool(), q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -376,7 +384,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f onDone(finalErr) }() - err := clientExec(ctx, c.pool, q, opts...) + err := clientExec(ctx, c.pool(), q, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -424,7 +432,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) ( onDone(err) }() - r, err = clientQuery(ctx, c.pool, q, opts...) + r, err = clientQuery(ctx, c.pool(), q, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -479,7 +487,7 @@ func (c *Client) QueryResultSet( onDone(finalErr, rowsCount) }() - rs, rowsCount, err = clientQueryResultSet(ctx, c.pool, q, settings, withTrace(c.config.Trace())) + rs, rowsCount, err = clientQueryResultSet(ctx, c.pool(), q, settings, withTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -487,6 +495,17 @@ func (c *Client) QueryResultSet( return rs, nil } +// pool returns the appropriate session pool based on the client configuration. +// If implicit sessions are enabled, it returns the implicit session pool; +// otherwise, it returns the explicit session pool. +func (c *Client) pool() sessionPool { + if c.config.AllowImplicitSessions() { + return c.implicitSessionPool + } + + return c.explicitSessionPool +} + func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (finalErr error) { ctx, cancel := xcontext.WithDone(ctx, c.done) defer cancel() @@ -503,7 +522,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options onDone(attempts, finalErr) }() - err := doTx(ctx, c.pool, op, + err := doTx(ctx, c.explicitSessionPool, op, settings.TxSettings(), append( []retry.Option{ @@ -565,25 +584,34 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * client := Ydb_Query_V1.NewQueryServiceClient(cc) + return newWithQueryServiceClient(ctx, client, cc, cfg) +} + +func newWithQueryServiceClient(ctx context.Context, + client Ydb_Query_V1.QueryServiceClient, + cc grpc.ClientConnInterface, + cfg *config.Config, +) *Client { return &Client{ - config: cfg, - client: client, - done: make(chan struct{}), - pool: pool.New(ctx, - pool.WithLimit[*Session, Session](cfg.PoolLimit()), - pool.WithItemUsageLimit[*Session, Session](cfg.PoolSessionUsageLimit()), - pool.WithItemUsageTTL[*Session, Session](cfg.PoolSessionUsageTTL()), - pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), - pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()), - pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()), - pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool { + config: cfg, + client: client, + done: make(chan struct{}), + implicitSessionPool: createImplicitSessionPool(ctx, cfg, client, cc), + explicitSessionPool: pool.New(ctx, + pool.WithLimit[*Session](cfg.PoolLimit()), + pool.WithItemUsageLimit[*Session](cfg.PoolSessionUsageLimit()), + pool.WithItemUsageTTL[*Session](cfg.PoolSessionUsageTTL()), + pool.WithTrace[*Session](poolTrace(cfg.Trace())), + pool.WithCreateItemTimeout[*Session](cfg.SessionCreateTimeout()), + pool.WithCloseItemTimeout[*Session](cfg.SessionDeleteTimeout()), + pool.WithMustDeleteItemFunc(func(s *Session, err error) bool { if !s.IsAlive() { return true } return err != nil && xerrors.MustDeleteTableOrQuerySession(err) }), - pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()), + pool.WithIdleTimeToLive[*Session](cfg.SessionIdleTimeToLive()), pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { var ( createCtx context.Context @@ -666,3 +694,28 @@ func poolTrace(t *trace.Query) *pool.Trace { }, } } + +func createImplicitSessionPool(ctx context.Context, + cfg *config.Config, + c Ydb_Query_V1.QueryServiceClient, + cc grpc.ClientConnInterface, +) sessionPool { + return pool.New(ctx, + pool.WithLimit[*Session](cfg.PoolLimit()), + pool.WithTrace[*Session](poolTrace(cfg.Trace())), + pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { + core := &sessionCore{ + cc: cc, + Client: c, + Trace: cfg.Trace(), + done: make(chan struct{}), + } + + return &Session{ + Core: core, + trace: cfg.Trace(), + client: c, + }, nil + }), + ) +} diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 1994c9112..c54b78eed 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -842,6 +843,13 @@ func TestClient(t *testing.T) { }), "") require.NoError(t, err) }) + + t.Run("AllowImplicitSessions", func(t *testing.T) { + err := mockClientForImplicitSessionTest(ctx, t). + Exec(ctx, "SELECT 1") + + require.NoError(t, err) + }) }) t.Run("Query", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1079,6 +1087,12 @@ func TestClient(t *testing.T) { require.Nil(t, r3) } }) + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). + Query(ctx, "SELECT 1") + + require.NoError(t, err) + }) }) t.Run("QueryResultSet", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1397,6 +1411,12 @@ func TestClient(t *testing.T) { require.Nil(t, rs) require.Equal(t, 0, rowsCount) }) + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). + QueryResultSet(ctx, "SELECT 1") + + require.NoError(t, err) + }) }) t.Run("QueryRow", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1537,7 +1557,46 @@ func TestClient(t *testing.T) { require.ErrorIs(t, err, errMoreThanOneRow) require.Nil(t, row) }) + + t.Run("AllowImplicitSessions", func(t *testing.T) { + _, err := mockClientForImplicitSessionTest(ctx, t). + QueryRow(ctx, "SELECT 1") + + require.NoError(t, err) + }) }) + + t.Run("Close", func(t *testing.T) { + t.Run("AllowImplicitSessions", func(t *testing.T) { + client := mockClientForImplicitSessionTest(ctx, t) + _, err := client.QueryRow(ctx, "SELECT 1") + require.NoError(t, err) + + err = client.Close(context.Background()) + + require.NoError(t, err) + }) + }) +} + +// mockClientForImplicitSessionTest creates a new Client with a test balancer +// for simulating implicit session scenarios in query client testing. It configures +// the mock in such way that calling `CreateSession` or `AttachSession` will result in an error. +func mockClientForImplicitSessionTest(ctx context.Context, t *testing.T) *Client { + ctrl := gomock.NewController(t) + + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + ResultSet: &Ydb.ResultSet{Rows: []*Ydb.Value{{}}}, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + + queryService := NewMockQueryServiceClient(ctrl) + queryService.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) + + cfg := config.New(config.AllowImplicitSessions()) + + return newWithQueryServiceClient(ctx, queryService, nil, cfg) } type sessionControllerMock struct { diff --git a/internal/query/config/config.go b/internal/query/config/config.go index 8eb221f38..9dc05f300 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -25,6 +25,8 @@ type Config struct { sessionDeleteTimeout time.Duration sessionIddleTimeToLive time.Duration + allowImplicitSessions bool + lazyTx bool trace *trace.Query @@ -62,6 +64,10 @@ func (c *Config) PoolLimit() int { return c.poolLimit } +func (c *Config) AllowImplicitSessions() bool { + return c.allowImplicitSessions +} + func (c *Config) PoolSessionUsageLimit() uint64 { return c.poolSessionUsageLimit } diff --git a/internal/query/config/options.go b/internal/query/config/options.go index ad7bda207..ed66d1845 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -80,6 +80,12 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option { } } +func AllowImplicitSessions() Option { + return func(c *Config) { + c.allowImplicitSessions = true + } +} + func WithLazyTx(lazyTx bool) Option { return func(c *Config) { c.lazyTx = lazyTx diff --git a/internal/query/session_core.go b/internal/query/session_core.go index c7652d3c6..79394f08e 100644 --- a/internal/query/session_core.go +++ b/internal/query/session_core.go @@ -282,7 +282,9 @@ func (core *sessionCore) IsAlive() bool { } func (core *sessionCore) Close(ctx context.Context) (err error) { - defer core.closeOnce() + if core.closeOnce != nil { + defer core.closeOnce() + } select { case <-core.done: diff --git a/query/client.go b/query/client.go index 64eb42689..661bc8276 100644 --- a/query/client.go +++ b/query/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -152,3 +153,12 @@ func WithLabel(lbl string) options.LabelOption { func WithRetryBudget(b budget.Budget) options.RetryOptionsOption { return options.WithRetryBudget(b) } + +// AllowImplicitSessions is an option to execute queries using an implicit session +// which allows the queries to be executed without explicitly creating a session. +// Please note that requests with this option use a separate session pool. +// +// Working with `query.Client.{Exec,Query,QueryResultSet,QueryRow}`. +func AllowImplicitSessions() config.Option { + return config.AllowImplicitSessions() +} diff --git a/tests/integration/query_allow_implicit_sessions_bench_test.go b/tests/integration/query_allow_implicit_sessions_bench_test.go new file mode 100644 index 000000000..360bd38a3 --- /dev/null +++ b/tests/integration/query_allow_implicit_sessions_bench_test.go @@ -0,0 +1,89 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +// BenchmarkQuery_Query_AllowImplicitSessions +// Result: +// goos: darwin +// goarch: arm64 +// pkg: github.com/ydb-platform/ydb-go-sdk/v3/tests/integration +// cpu: Apple M3 Pro +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-1-12 7020 193954 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-2-12 7903 173707 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-16-12 7006 156601 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-512-12 6811 165773 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-2048-12 8218 163119 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-16384-12 7093 170583 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-65536-12 6410 176477 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-131072-12 5841 179243 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-262144-12 5552 203478 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-393216-12 3854 274290 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-524288-12 4 251855177 ns/op +// BenchmarkQuery_Query_AllowImplicitSessions/parallel-1048576-12 1 1902566958 ns/op +func BenchmarkQuery_Query_AllowImplicitSessions(b *testing.B) { + benchOverQueryService(context.TODO(), b, + ydb.WithQueryConfigOption(query.AllowImplicitSessions()), + ) +} + +// BenchmarkQuery_Query +// Result: +// goos: darwin +// goarch: arm64 +// pkg: github.com/ydb-platform/ydb-go-sdk/v3/tests/integration +// cpu: Apple M3 Pro +// BenchmarkQuery_Query/parallel-1-12 9445 128672 ns/op +// BenchmarkQuery_Query/parallel-2-12 12777 100227 ns/op +// BenchmarkQuery_Query/parallel-16-12 13782 87532 ns/op +// BenchmarkQuery_Query/parallel-512-12 12950 92284 ns/op +// BenchmarkQuery_Query/parallel-2048-12 12091 96659 ns/op +// BenchmarkQuery_Query/parallel-16384-12 14293 94588 ns/op +// BenchmarkQuery_Query/parallel-65536-12 11144 96578 ns/op +// BenchmarkQuery_Query/parallel-131072-12 12848 103441 ns/op +// BenchmarkQuery_Query/parallel-262144-12 10414 120940 ns/op +// BenchmarkQuery_Query/parallel-393216-12 7090 154935 ns/op +// BenchmarkQuery_Query/parallel-524288-12 188 5381982 ns/op +// BenchmarkQuery_Query/parallel-1048576-12 1 1918443542 ns/op +func BenchmarkQuery_Query(b *testing.B) { + benchOverQueryService(context.TODO(), b) +} + +func benchOverQueryService(ctx context.Context, b *testing.B, driverOpts ...ydb.Option) { + goroutinesCnt := []int{1, 2, 16, 512, 2048, 16384, 65536, 131072, 262144, 393216, 524288, 1048576} + + for _, parallelism := range goroutinesCnt { + b.Run(fmt.Sprintf("parallel-%d", parallelism), func(b *testing.B) { + b.SetParallelism(parallelism) + + db, err := ydb.Open(ctx, "grpc://localhost:2136/local", driverOpts...) + + require.NoError(b, err) + defer db.Close(ctx) + + q := db.Query() + + // Warmup + _, err = q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := q.Query(ctx, `SELECT 42 as id, "my string" as myStr`) + require.NoError(b, err) + } + }) + }) + } +}