Skip to content

Commit 81b0420

Browse files
Execute queries without explicit session creation (#1827)
* Execute queries without explicit session creation --------- Co-authored-by: Aleksey Myasnikov <asmyasnikov@ydb.tech>
1 parent e7e1a2b commit 81b0420

File tree

9 files changed

+252
-24
lines changed

9 files changed

+252
-24
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `query.AllowImplicitSessions()` option for execute queries through `query.Client.{Exec,Query,QueryResultSet,QueryRow}` without explicit sessions
2+
13
## v3.112.0
24
* Added support for the `json.Unmarshaler` interface in the `CastTo` function for use in scanners, such as the `ScanStruct` method
35
* Fixed the support of server-side session balancing in `database/sql` driver

VERSIONING.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ We making the following exceptions to those guidelines:
2121
- Some public API of `ydb-go-sdk` relate to the internals.
2222
- We use the `// Internals` comment for public internals in the `ydb-go-sdk`.
2323
- `ydb-go-sdk` internals can be changed at any time without increase of major part of version.
24-
- Internals will never marked as stable
24+
- Internals will never marked as stable
25+
- `testutil` package can be changed at any time without increase of major part of version.

internal/query/client.go

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@ type (
4141
With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error
4242
}
4343
Client struct {
44-
config *config.Config
45-
client Ydb_Query_V1.QueryServiceClient
46-
pool sessionPool
44+
config *config.Config
45+
client Ydb_Query_V1.QueryServiceClient
46+
explicitSessionPool sessionPool
47+
48+
// implicitSessionPool is a pool of implicit sessions,
49+
// i.e. fake sessions created without CreateSession/AttachSession requests.
50+
implicitSessionPool sessionPool
4751

4852
done chan struct{}
4953
}
@@ -194,7 +198,11 @@ func (c *Client) Close(ctx context.Context) error {
194198

195199
close(c.done)
196200

197-
if err := c.pool.Close(ctx); err != nil {
201+
if err := c.explicitSessionPool.Close(ctx); err != nil {
202+
return xerrors.WithStackTrace(err)
203+
}
204+
205+
if err := c.implicitSessionPool.Close(ctx); err != nil {
198206
return xerrors.WithStackTrace(err)
199207
}
200208

@@ -242,7 +250,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
242250
onDone(attempts, finalErr)
243251
}()
244252

245-
err := do(ctx, c.pool,
253+
err := do(ctx, c.explicitSessionPool,
246254
func(ctx context.Context, s *Session) error {
247255
return op(ctx, s)
248256
},
@@ -329,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
329337
onDone(finalErr)
330338
}()
331339

332-
row, err := clientQueryRow(ctx, c.pool, q, settings, withTrace(c.config.Trace()))
340+
row, err := clientQueryRow(ctx, c.pool(), q, settings, withTrace(c.config.Trace()))
333341
if err != nil {
334342
return nil, xerrors.WithStackTrace(err)
335343
}
@@ -376,7 +384,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f
376384
onDone(finalErr)
377385
}()
378386

379-
err := clientExec(ctx, c.pool, q, opts...)
387+
err := clientExec(ctx, c.pool(), q, opts...)
380388
if err != nil {
381389
return xerrors.WithStackTrace(err)
382390
}
@@ -424,7 +432,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) (
424432
onDone(err)
425433
}()
426434

427-
r, err = clientQuery(ctx, c.pool, q, opts...)
435+
r, err = clientQuery(ctx, c.pool(), q, opts...)
428436
if err != nil {
429437
return nil, xerrors.WithStackTrace(err)
430438
}
@@ -479,14 +487,25 @@ func (c *Client) QueryResultSet(
479487
onDone(finalErr, rowsCount)
480488
}()
481489

482-
rs, rowsCount, err = clientQueryResultSet(ctx, c.pool, q, settings, withTrace(c.config.Trace()))
490+
rs, rowsCount, err = clientQueryResultSet(ctx, c.pool(), q, settings, withTrace(c.config.Trace()))
483491
if err != nil {
484492
return nil, xerrors.WithStackTrace(err)
485493
}
486494

487495
return rs, nil
488496
}
489497

498+
// pool returns the appropriate session pool based on the client configuration.
499+
// If implicit sessions are enabled, it returns the implicit session pool;
500+
// otherwise, it returns the explicit session pool.
501+
func (c *Client) pool() sessionPool {
502+
if c.config.AllowImplicitSessions() {
503+
return c.implicitSessionPool
504+
}
505+
506+
return c.explicitSessionPool
507+
}
508+
490509
func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (finalErr error) {
491510
ctx, cancel := xcontext.WithDone(ctx, c.done)
492511
defer cancel()
@@ -503,7 +522,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
503522
onDone(attempts, finalErr)
504523
}()
505524

506-
err := doTx(ctx, c.pool, op,
525+
err := doTx(ctx, c.explicitSessionPool, op,
507526
settings.TxSettings(),
508527
append(
509528
[]retry.Option{
@@ -565,25 +584,34 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
565584

566585
client := Ydb_Query_V1.NewQueryServiceClient(cc)
567586

587+
return newWithQueryServiceClient(ctx, client, cc, cfg)
588+
}
589+
590+
func newWithQueryServiceClient(ctx context.Context,
591+
client Ydb_Query_V1.QueryServiceClient,
592+
cc grpc.ClientConnInterface,
593+
cfg *config.Config,
594+
) *Client {
568595
return &Client{
569-
config: cfg,
570-
client: client,
571-
done: make(chan struct{}),
572-
pool: pool.New(ctx,
573-
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
574-
pool.WithItemUsageLimit[*Session, Session](cfg.PoolSessionUsageLimit()),
575-
pool.WithItemUsageTTL[*Session, Session](cfg.PoolSessionUsageTTL()),
576-
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
577-
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
578-
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
579-
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
596+
config: cfg,
597+
client: client,
598+
done: make(chan struct{}),
599+
implicitSessionPool: createImplicitSessionPool(ctx, cfg, client, cc),
600+
explicitSessionPool: pool.New(ctx,
601+
pool.WithLimit[*Session](cfg.PoolLimit()),
602+
pool.WithItemUsageLimit[*Session](cfg.PoolSessionUsageLimit()),
603+
pool.WithItemUsageTTL[*Session](cfg.PoolSessionUsageTTL()),
604+
pool.WithTrace[*Session](poolTrace(cfg.Trace())),
605+
pool.WithCreateItemTimeout[*Session](cfg.SessionCreateTimeout()),
606+
pool.WithCloseItemTimeout[*Session](cfg.SessionDeleteTimeout()),
607+
pool.WithMustDeleteItemFunc(func(s *Session, err error) bool {
580608
if !s.IsAlive() {
581609
return true
582610
}
583611

584612
return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
585613
}),
586-
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
614+
pool.WithIdleTimeToLive[*Session](cfg.SessionIdleTimeToLive()),
587615
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
588616
var (
589617
createCtx context.Context
@@ -666,3 +694,28 @@ func poolTrace(t *trace.Query) *pool.Trace {
666694
},
667695
}
668696
}
697+
698+
func createImplicitSessionPool(ctx context.Context,
699+
cfg *config.Config,
700+
c Ydb_Query_V1.QueryServiceClient,
701+
cc grpc.ClientConnInterface,
702+
) sessionPool {
703+
return pool.New(ctx,
704+
pool.WithLimit[*Session](cfg.PoolLimit()),
705+
pool.WithTrace[*Session](poolTrace(cfg.Trace())),
706+
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
707+
core := &sessionCore{
708+
cc: cc,
709+
Client: c,
710+
Trace: cfg.Trace(),
711+
done: make(chan struct{}),
712+
}
713+
714+
return &Session{
715+
Core: core,
716+
trace: cfg.Trace(),
717+
client: c,
718+
}, nil
719+
}),
720+
)
721+
}

internal/query/client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"google.golang.org/protobuf/types/known/anypb"
2121

2222
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
23+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
2324
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
2425
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
2526
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -842,6 +843,13 @@ func TestClient(t *testing.T) {
842843
}), "")
843844
require.NoError(t, err)
844845
})
846+
847+
t.Run("AllowImplicitSessions", func(t *testing.T) {
848+
err := mockClientForImplicitSessionTest(ctx, t).
849+
Exec(ctx, "SELECT 1")
850+
851+
require.NoError(t, err)
852+
})
845853
})
846854
t.Run("Query", func(t *testing.T) {
847855
t.Run("HappyWay", func(t *testing.T) {
@@ -1079,6 +1087,12 @@ func TestClient(t *testing.T) {
10791087
require.Nil(t, r3)
10801088
}
10811089
})
1090+
t.Run("AllowImplicitSessions", func(t *testing.T) {
1091+
_, err := mockClientForImplicitSessionTest(ctx, t).
1092+
Query(ctx, "SELECT 1")
1093+
1094+
require.NoError(t, err)
1095+
})
10821096
})
10831097
t.Run("QueryResultSet", func(t *testing.T) {
10841098
t.Run("HappyWay", func(t *testing.T) {
@@ -1397,6 +1411,12 @@ func TestClient(t *testing.T) {
13971411
require.Nil(t, rs)
13981412
require.Equal(t, 0, rowsCount)
13991413
})
1414+
t.Run("AllowImplicitSessions", func(t *testing.T) {
1415+
_, err := mockClientForImplicitSessionTest(ctx, t).
1416+
QueryResultSet(ctx, "SELECT 1")
1417+
1418+
require.NoError(t, err)
1419+
})
14001420
})
14011421
t.Run("QueryRow", func(t *testing.T) {
14021422
t.Run("HappyWay", func(t *testing.T) {
@@ -1537,7 +1557,46 @@ func TestClient(t *testing.T) {
15371557
require.ErrorIs(t, err, errMoreThanOneRow)
15381558
require.Nil(t, row)
15391559
})
1560+
1561+
t.Run("AllowImplicitSessions", func(t *testing.T) {
1562+
_, err := mockClientForImplicitSessionTest(ctx, t).
1563+
QueryRow(ctx, "SELECT 1")
1564+
1565+
require.NoError(t, err)
1566+
})
15401567
})
1568+
1569+
t.Run("Close", func(t *testing.T) {
1570+
t.Run("AllowImplicitSessions", func(t *testing.T) {
1571+
client := mockClientForImplicitSessionTest(ctx, t)
1572+
_, err := client.QueryRow(ctx, "SELECT 1")
1573+
require.NoError(t, err)
1574+
1575+
err = client.Close(context.Background())
1576+
1577+
require.NoError(t, err)
1578+
})
1579+
})
1580+
}
1581+
1582+
// mockClientForImplicitSessionTest creates a new Client with a test balancer
1583+
// for simulating implicit session scenarios in query client testing. It configures
1584+
// the mock in such way that calling `CreateSession` or `AttachSession` will result in an error.
1585+
func mockClientForImplicitSessionTest(ctx context.Context, t *testing.T) *Client {
1586+
ctrl := gomock.NewController(t)
1587+
1588+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
1589+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
1590+
ResultSet: &Ydb.ResultSet{Rows: []*Ydb.Value{{}}},
1591+
}, nil)
1592+
stream.EXPECT().Recv().Return(nil, io.EOF)
1593+
1594+
queryService := NewMockQueryServiceClient(ctrl)
1595+
queryService.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
1596+
1597+
cfg := config.New(config.AllowImplicitSessions())
1598+
1599+
return newWithQueryServiceClient(ctx, queryService, nil, cfg)
15411600
}
15421601

15431602
type sessionControllerMock struct {

internal/query/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type Config struct {
2525
sessionDeleteTimeout time.Duration
2626
sessionIddleTimeToLive time.Duration
2727

28+
allowImplicitSessions bool
29+
2830
lazyTx bool
2931

3032
trace *trace.Query
@@ -62,6 +64,10 @@ func (c *Config) PoolLimit() int {
6264
return c.poolLimit
6365
}
6466

67+
func (c *Config) AllowImplicitSessions() bool {
68+
return c.allowImplicitSessions
69+
}
70+
6571
func (c *Config) PoolSessionUsageLimit() uint64 {
6672
return c.poolSessionUsageLimit
6773
}

internal/query/config/options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option {
8080
}
8181
}
8282

83+
func AllowImplicitSessions() Option {
84+
return func(c *Config) {
85+
c.allowImplicitSessions = true
86+
}
87+
}
88+
8389
func WithLazyTx(lazyTx bool) Option {
8490
return func(c *Config) {
8591
c.lazyTx = lazyTx

internal/query/session_core.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ func (core *sessionCore) IsAlive() bool {
282282
}
283283

284284
func (core *sessionCore) Close(ctx context.Context) (err error) {
285-
defer core.closeOnce()
285+
if core.closeOnce != nil {
286+
defer core.closeOnce()
287+
}
286288

287289
select {
288290
case <-core.done:

query/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
910
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -152,3 +153,12 @@ func WithLabel(lbl string) options.LabelOption {
152153
func WithRetryBudget(b budget.Budget) options.RetryOptionsOption {
153154
return options.WithRetryBudget(b)
154155
}
156+
157+
// AllowImplicitSessions is an option to execute queries using an implicit session
158+
// which allows the queries to be executed without explicitly creating a session.
159+
// Please note that requests with this option use a separate session pool.
160+
//
161+
// Working with `query.Client.{Exec,Query,QueryResultSet,QueryRow}`.
162+
func AllowImplicitSessions() config.Option {
163+
return config.AllowImplicitSessions()
164+
}

0 commit comments

Comments
 (0)