Skip to content

Execute queries without explicit session creation #1827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `query.AllowImplicitSessions()` option for execute queries through `query.Client.{Exec,Query,QueryResultSet,QueryRow}` without explicit sessions

## v3.112.0
* Added support for the `json.Unmarshaler` interface in the `CastTo` function for use in scanners, such as the `ScanStruct` method
* Fixed the support of server-side session balancing in `database/sql` driver
Expand Down
3 changes: 2 additions & 1 deletion VERSIONING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ We making the following exceptions to those guidelines:
- Some public API of `ydb-go-sdk` relate to the internals.
- We use the `// Internals` comment for public internals in the `ydb-go-sdk`.
- `ydb-go-sdk` internals can be changed at any time without increase of major part of version.
- Internals will never marked as stable
- Internals will never marked as stable
- `testutil` package can be changed at any time without increase of major part of version.
93 changes: 71 additions & 22 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ type (
With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error
}
Client struct {
config *config.Config
client Ydb_Query_V1.QueryServiceClient
pool sessionPool
config *config.Config
client Ydb_Query_V1.QueryServiceClient
explicitSessionPool sessionPool

// implicitSessionPool is a pool of implicit sessions,
// i.e. fake sessions created without CreateSession/AttachSession requests.
implicitSessionPool sessionPool

done chan struct{}
}
Expand Down Expand Up @@ -194,7 +198,7 @@ func (c *Client) Close(ctx context.Context) error {

close(c.done)

if err := c.pool.Close(ctx); err != nil {
if err := c.explicitSessionPool.Close(ctx); err != nil {
return xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -242,7 +246,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
onDone(attempts, finalErr)
}()

err := do(ctx, c.pool,
err := do(ctx, c.explicitSessionPool,
func(ctx context.Context, s *Session) error {
return op(ctx, s)
},
Expand Down Expand Up @@ -329,7 +333,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
onDone(finalErr)
}()

row, err := clientQueryRow(ctx, c.pool, q, settings, withTrace(c.config.Trace()))
row, err := clientQueryRow(ctx, c.pool(), q, settings, withTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -376,7 +380,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f
onDone(finalErr)
}()

err := clientExec(ctx, c.pool, q, opts...)
err := clientExec(ctx, c.pool(), q, opts...)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -424,7 +428,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) (
onDone(err)
}()

r, err = clientQuery(ctx, c.pool, q, opts...)
r, err = clientQuery(ctx, c.pool(), q, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -479,14 +483,25 @@ func (c *Client) QueryResultSet(
onDone(finalErr, rowsCount)
}()

rs, rowsCount, err = clientQueryResultSet(ctx, c.pool, q, settings, withTrace(c.config.Trace()))
rs, rowsCount, err = clientQueryResultSet(ctx, c.pool(), q, settings, withTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return rs, nil
}

// pool returns the appropriate session pool based on the client configuration.
// If implicit sessions are enabled, it returns the implicit session pool;
// otherwise, it returns the explicit session pool.
func (c *Client) pool() sessionPool {
if c.config.AllowImplicitSessions() {
return c.implicitSessionPool
}

return c.explicitSessionPool
}

func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (finalErr error) {
ctx, cancel := xcontext.WithDone(ctx, c.done)
defer cancel()
Expand All @@ -503,7 +518,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
onDone(attempts, finalErr)
}()

err := doTx(ctx, c.pool, op,
err := doTx(ctx, c.explicitSessionPool, op,
settings.TxSettings(),
append(
[]retry.Option{
Expand Down Expand Up @@ -565,25 +580,34 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *

client := Ydb_Query_V1.NewQueryServiceClient(cc)

return newWithQueryServiceClient(ctx, client, cc, cfg)
}

func newWithQueryServiceClient(ctx context.Context,
client Ydb_Query_V1.QueryServiceClient,
cc grpc.ClientConnInterface,
cfg *config.Config,
) *Client {
return &Client{
config: cfg,
client: client,
done: make(chan struct{}),
pool: pool.New(ctx,
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithItemUsageLimit[*Session, Session](cfg.PoolSessionUsageLimit()),
pool.WithItemUsageTTL[*Session, Session](cfg.PoolSessionUsageTTL()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
config: cfg,
client: client,
done: make(chan struct{}),
implicitSessionPool: createImplicitSessionPool(ctx, cfg, client, cc),
explicitSessionPool: pool.New(ctx,
pool.WithLimit[*Session](cfg.PoolLimit()),
pool.WithItemUsageLimit[*Session](cfg.PoolSessionUsageLimit()),
pool.WithItemUsageTTL[*Session](cfg.PoolSessionUsageTTL()),
pool.WithTrace[*Session](poolTrace(cfg.Trace())),
pool.WithCreateItemTimeout[*Session](cfg.SessionCreateTimeout()),
pool.WithCloseItemTimeout[*Session](cfg.SessionDeleteTimeout()),
pool.WithMustDeleteItemFunc(func(s *Session, err error) bool {
if !s.IsAlive() {
return true
}

return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
}),
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
pool.WithIdleTimeToLive[*Session](cfg.SessionIdleTimeToLive()),
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
var (
createCtx context.Context
Expand Down Expand Up @@ -666,3 +690,28 @@ func poolTrace(t *trace.Query) *pool.Trace {
},
}
}

func createImplicitSessionPool(ctx context.Context,
cfg *config.Config,
c Ydb_Query_V1.QueryServiceClient,
cc grpc.ClientConnInterface,
) sessionPool {
return pool.New(ctx,
pool.WithLimit[*Session](cfg.PoolLimit()),
pool.WithTrace[*Session](poolTrace(cfg.Trace())),
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
core := &sessionCore{
cc: cc,
Client: c,
Trace: cfg.Trace(),
done: make(chan struct{}),
}

return &Session{
Core: core,
trace: cfg.Trace(),
client: c,
}, nil
}),
)
}
47 changes: 47 additions & 0 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
Expand Down Expand Up @@ -842,6 +843,13 @@ func TestClient(t *testing.T) {
}), "")
require.NoError(t, err)
})

t.Run("AllowImplicitSessions", func(t *testing.T) {
err := mockClientForImplicitSessionTest(ctx, t).
Exec(ctx, "SELECT 1")

require.NoError(t, err)
})
})
t.Run("Query", func(t *testing.T) {
t.Run("HappyWay", func(t *testing.T) {
Expand Down Expand Up @@ -1079,6 +1087,12 @@ func TestClient(t *testing.T) {
require.Nil(t, r3)
}
})
t.Run("AllowImplicitSessions", func(t *testing.T) {
_, err := mockClientForImplicitSessionTest(ctx, t).
Query(ctx, "SELECT 1")

require.NoError(t, err)
})
})
t.Run("QueryResultSet", func(t *testing.T) {
t.Run("HappyWay", func(t *testing.T) {
Expand Down Expand Up @@ -1397,6 +1411,12 @@ func TestClient(t *testing.T) {
require.Nil(t, rs)
require.Equal(t, 0, rowsCount)
})
t.Run("AllowImplicitSessions", func(t *testing.T) {
_, err := mockClientForImplicitSessionTest(ctx, t).
QueryResultSet(ctx, "SELECT 1")

require.NoError(t, err)
})
})
t.Run("QueryRow", func(t *testing.T) {
t.Run("HappyWay", func(t *testing.T) {
Expand Down Expand Up @@ -1537,9 +1557,36 @@ func TestClient(t *testing.T) {
require.ErrorIs(t, err, errMoreThanOneRow)
require.Nil(t, row)
})

t.Run("AllowImplicitSessions", func(t *testing.T) {
_, err := mockClientForImplicitSessionTest(ctx, t).
QueryRow(ctx, "SELECT 1")

require.NoError(t, err)
})
})
}

// mockClientForImplicitSessionTest creates a new Client with a test balancer
// for simulating implicit session scenarios in query client testing. It configures
// the mock in such way that calling `CreateSession` or `AttachSession` will result in an error.
func mockClientForImplicitSessionTest(ctx context.Context, t *testing.T) *Client {
ctrl := gomock.NewController(t)

stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
ResultSet: &Ydb.ResultSet{Rows: []*Ydb.Value{{}}},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)

queryService := NewMockQueryServiceClient(ctrl)
queryService.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)

cfg := config.New(config.AllowImplicitSessions())

return newWithQueryServiceClient(ctx, queryService, nil, cfg)
}

type sessionControllerMock struct {
id string
status Status
Expand Down
6 changes: 6 additions & 0 deletions internal/query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Config struct {
sessionDeleteTimeout time.Duration
sessionIddleTimeToLive time.Duration

allowImplicitSessions bool

lazyTx bool

trace *trace.Query
Expand Down Expand Up @@ -62,6 +64,10 @@ func (c *Config) PoolLimit() int {
return c.poolLimit
}

func (c *Config) AllowImplicitSessions() bool {
return c.allowImplicitSessions
}

func (c *Config) PoolSessionUsageLimit() uint64 {
return c.poolSessionUsageLimit
}
Expand Down
6 changes: 6 additions & 0 deletions internal/query/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option {
}
}

func AllowImplicitSessions() Option {
return func(c *Config) {
c.allowImplicitSessions = true
}
}

func WithLazyTx(lazyTx bool) Option {
return func(c *Config) {
c.lazyTx = lazyTx
Expand Down
10 changes: 10 additions & 0 deletions query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
Expand Down Expand Up @@ -152,3 +153,12 @@ func WithLabel(lbl string) options.LabelOption {
func WithRetryBudget(b budget.Budget) options.RetryOptionsOption {
return options.WithRetryBudget(b)
}

// AllowImplicitSessions is an option to execute queries using an implicit session
// which allows the queries to be executed without explicitly creating a session.
// Please note that requests with this option use a separate session pool.
//
// Working with `query.Client.{Exec,Query,QueryResultSet,QueryRow}`.
func AllowImplicitSessions() config.Option {
return config.AllowImplicitSessions()
}
Loading
Loading