Skip to content

Commit c667551

Browse files
authored
Merge branch 'master' into back-comp
2 parents 8bb2c68 + 304cde8 commit c667551

File tree

10 files changed

+79
-14
lines changed

10 files changed

+79
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
* Added `Struct` support for `Variant` in `ydb.ParamsBuilder()`
21
* Restored `WithSessionPoolKeepAliveMinSize` and `WithSessionPoolKeepAliveTimeout` for backward compatibility.
2+
* Fixed leak timers
3+
* Changed default StartTime (time of retries for connect to server) for topic writer from 1 minute to infinite (can be overrided by WithWriterStartTimeout topic option)
4+
* Added `Struct` support for `Variant` in `ydb.ParamsBuilder()`
5+
* Added `go` with anonymous function case in `gstack`
36

47
## v3.61.2
58
* Changed default transaction control to `NoTx` for execute query through query service client

internal/cmd/gstack/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ func getCallExpressionsFromStmt(statement ast.Stmt) (listOfCallExpressions []*as
7979
body = stmt.Body
8080
case *ast.ForStmt:
8181
body = stmt.Body
82+
case *ast.GoStmt:
83+
if fun, ok := stmt.Call.Fun.(*ast.FuncLit); ok {
84+
listOfCallExpressions = append(listOfCallExpressions, getListOfCallExpressionsFromBlockStmt(fun.Body)...)
85+
} else {
86+
listOfCallExpressions = append(listOfCallExpressions, stmt.Call)
87+
}
8288
case *ast.RangeStmt:
8389
body = stmt.Body
8490
case *ast.DeclStmt:

internal/coordination/session.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,12 @@ func (s *session) newStream(
139139

140140
var client Ydb_Coordination_V1.CoordinationService_SessionClient
141141
if lastChance {
142+
timer := time.NewTimer(s.options.SessionKeepAliveTimeout)
142143
select {
143-
case <-time.After(s.options.SessionKeepAliveTimeout):
144+
case <-timer.C:
144145
case client = <-result:
145146
}
147+
timer.Stop()
146148

147149
if client != nil {
148150
return client, nil
@@ -175,10 +177,12 @@ func (s *session) newStream(
175177
}
176178

177179
// Waiting for some time before trying to reconnect.
180+
sessionReconnectDelay := time.NewTimer(s.options.SessionReconnectDelay)
178181
select {
179-
case <-time.After(s.options.SessionReconnectDelay):
182+
case <-sessionReconnectDelay.C:
180183
case <-s.ctx.Done():
181184
}
185+
sessionReconnectDelay.Stop()
182186

183187
if s.ctx.Err() != nil {
184188
// Give this session the last chance to stop gracefully if the session is canceled in the reconnect cycle.
@@ -247,6 +251,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
247251

248252
// Wait for the session started response unless the stream context is done. We intentionally do not take into
249253
// account stream context cancellation in order to proceed with the graceful shutdown if it requires reconnect.
254+
sessionStartTimer := time.NewTimer(s.options.SessionStartTimeout)
250255
select {
251256
case start := <-sessionStarted:
252257
trace.CoordinationOnSessionStarted(s.client.config.Trace(), start.GetSessionId(), s.sessionID)
@@ -258,13 +263,14 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
258263
cancelStream()
259264
}
260265
close(startSending)
261-
case <-time.After(s.options.SessionStartTimeout):
266+
case <-sessionStartTimer.C:
262267
// Reconnect if no response was received before the timeout occurred.
263268
trace.CoordinationOnSessionStartTimeout(s.client.config.Trace(), s.options.SessionStartTimeout)
264269
cancelStream()
265270
case <-streamCtx.Done():
266271
case <-s.ctx.Done():
267272
}
273+
sessionStartTimer.Stop()
268274

269275
for {
270276
// Respect the failure reason priority: if the session context is done, we must stop the session, even
@@ -280,8 +286,9 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
280286
}
281287

282288
keepAliveTime := time.Until(s.getLastGoodResponseTime().Add(s.options.SessionKeepAliveTimeout))
289+
keepAliveTimeTimer := time.NewTimer(keepAliveTime)
283290
select {
284-
case <-time.After(keepAliveTime):
291+
case <-keepAliveTimeTimer.C:
285292
last := s.getLastGoodResponseTime()
286293
if time.Since(last) > s.options.SessionKeepAliveTimeout {
287294
// Reconnect if the underlying stream is likely to be dead.
@@ -295,6 +302,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
295302
case <-streamCtx.Done():
296303
case <-s.ctx.Done():
297304
}
305+
keepAliveTimeTimer.Stop()
298306
}
299307

300308
if closing {
@@ -318,8 +326,10 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
318326
)
319327

320328
// Wait for the session stopped response unless the stream context is done.
329+
sessionStopTimeout := time.NewTimer(s.options.SessionStopTimeout)
321330
select {
322331
case stop := <-sessionStopped:
332+
sessionStopTimeout.Stop()
323333
trace.CoordinationOnSessionStopped(s.client.config.Trace(), stop.GetSessionId(), s.sessionID)
324334
if stop.GetSessionId() == s.sessionID {
325335
cancelStream()
@@ -329,15 +339,19 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
329339

330340
// Reconnect if the server response is invalid.
331341
cancelStream()
332-
case <-time.After(s.options.SessionStopTimeout):
342+
case <-sessionStopTimeout.C:
343+
sessionStopTimeout.Stop() // no really need, call stop for common style only
344+
333345
// Reconnect if no response was received before the timeout occurred.
334346
trace.CoordinationOnSessionStopTimeout(s.client.config.Trace(), s.options.SessionStopTimeout)
335347
cancelStream()
336348
case <-s.ctx.Done():
349+
sessionStopTimeout.Stop()
337350
cancelStream()
338351

339352
return
340353
case <-streamCtx.Done():
354+
sessionStopTimeout.Stop()
341355
}
342356
}
343357

internal/stack/function_id_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package stack
22

33
import (
4+
"sync"
45
"testing"
6+
"time"
57

68
"github.com/stretchr/testify/require"
79
)
@@ -22,6 +24,22 @@ func (e *starType) starredCall() string {
2224
return FunctionID("").FunctionID()
2325
}
2426

27+
func anonymousFunctionCall() string {
28+
var result string
29+
var mu sync.Mutex
30+
go func() {
31+
mu.Lock()
32+
defer mu.Unlock()
33+
result = FunctionID("").FunctionID()
34+
}()
35+
time.Sleep(time.Second)
36+
37+
mu.Lock()
38+
defer mu.Unlock()
39+
40+
return result
41+
}
42+
2543
func TestFunctionIDForGenericType(t *testing.T) {
2644
t.Run("StaticFunc", func(t *testing.T) {
2745
require.Equal(t,
@@ -42,4 +60,10 @@ func TestFunctionIDForGenericType(t *testing.T) {
4260
x.starredCall(),
4361
)
4462
})
63+
t.Run("AnonymousFunctionCall", func(t *testing.T) {
64+
require.Equal(t,
65+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack.anonymousFunctionCall",
66+
anonymousFunctionCall(),
67+
)
68+
})
4569
}

internal/table/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,10 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s
489489

490490
var createSessionTimeoutCh <-chan time.Time
491491
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
492-
createSessionTimeoutCh = c.clock.After(timeout)
492+
createSessionTimeoutChTimer := c.clock.NewTimer(timeout)
493+
defer createSessionTimeoutChTimer.Stop()
494+
495+
createSessionTimeoutCh = createSessionTimeoutChTimer.Chan()
493496
}
494497

495498
select {

internal/topic/retriable_error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
const (
16-
DefaultStartTimeout = time.Minute
16+
DefaultStartTimeout = value.InfiniteDuration
1717
connectionEstablishedTimeout = time.Minute
1818
)
1919

internal/topic/topicreaderinternal/committer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ func (c *committer) waitSendTrigger(ctx context.Context) {
166166
return
167167
}
168168

169-
finish := c.clock.After(c.BufferTimeLagTrigger)
169+
bufferTimeLagTriggerTimer := c.clock.NewTimer(c.BufferTimeLagTrigger)
170+
defer bufferTimeLagTriggerTimer.Stop()
171+
172+
finish := bufferTimeLagTriggerTimer.Chan()
170173
if c.BufferCountTrigger == 0 {
171174
select {
172175
case <-ctxDone:

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,12 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
328328
result <- connectResult{stream: stream, err: err}
329329
}()
330330

331+
connectionTimoutTimer := r.clock.NewTimer(r.connectTimeout)
332+
defer connectionTimoutTimer.Stop()
333+
331334
var res connectResult
332335
select {
333-
case <-r.clock.After(r.connectTimeout):
336+
case <-connectionTimoutTimer.Chan():
334337
// cancel connection context only if timeout exceed while connection
335338
// because if cancel context after connect - it will break
336339
cancel()

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,16 +380,21 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
380380
prevAttemptTime = now
381381

382382
if reconnectReason != nil {
383-
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, w.clock.Since(startOfRetries)); retry {
383+
retryDuration := w.clock.Since(startOfRetries)
384+
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
384385
delay := backoff.Delay(attempt)
386+
delayTimer := w.clock.NewTimer(delay)
385387
select {
386388
case <-doneCtx:
389+
delayTimer.Stop()
390+
387391
return
388-
case <-w.clock.After(delay):
392+
case <-delayTimer.Chan():
393+
delayTimer.Stop() // no really need, stop for common style only
389394
// pass
390395
}
391396
} else {
392-
_ = w.close(ctx, reconnectReason)
397+
_ = w.close(ctx, fmt.Errorf("%w, was retried (%v)", reconnectReason, retryDuration))
393398

394399
return
395400
}

internal/xsql/connector.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,14 @@ func (c *Connector) idleCloser() (idleStopper func()) {
262262
ctx, idleStopper = xcontext.WithCancel(context.Background())
263263
go func() {
264264
for {
265+
idleThresholdTimer := c.clock.NewTimer(c.idleThreshold)
265266
select {
266267
case <-ctx.Done():
268+
idleThresholdTimer.Stop()
269+
267270
return
268-
case <-c.clock.After(c.idleThreshold):
271+
case <-idleThresholdTimer.Chan():
272+
idleThresholdTimer.Stop() // no really need, stop for common style only
269273
c.connsMtx.RLock()
270274
conns := make([]*conn, 0, len(c.conns))
271275
for cc := range c.conns {

0 commit comments

Comments
 (0)