Skip to content

Commit 2e23e14

Browse files
authored
Merge pull request #1598 from ydb-platform/fix-goroutine-leak
* Fixed goroutine leak on failed execute call in query client
2 parents 2855451 + 45f8ca2 commit 2e23e14

File tree

3 files changed

+39
-11
lines changed

3 files changed

+39
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed goroutine leak on failed execute call in query client
2+
13
## v3.95.4
24
* Fixed connections pool leak on closing sessions
35
* Fixed an error in logging session deletion events

internal/query/execute_query.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,22 @@ func execute(
126126
return nil, xerrors.WithStackTrace(err)
127127
}
128128

129-
executeCtx := xcontext.ValueOnly(ctx)
129+
executeCtx, executeCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
130+
defer func() {
131+
if finalErr != nil {
132+
executeCancel()
133+
}
134+
}()
130135

131136
stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
132137
if err != nil {
133138
return nil, xerrors.WithStackTrace(err)
134139
}
135140

136-
r, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
141+
r, err := newResult(ctx, stream, append(opts,
142+
withStatsCallback(settings.StatsCallback()),
143+
withOnClose(executeCancel),
144+
)...)
137145
if err != nil {
138146
return nil, xerrors.WithStackTrace(err)
139147
}

internal/query/result.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type (
3939
closed chan struct{}
4040
trace *trace.Query
4141
statsCallback func(queryStats stats.QueryStats)
42+
onClose []func()
4243
onNextPartErr []func(err error)
4344
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
4445
}
@@ -98,6 +99,12 @@ func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption
9899
}
99100
}
100101

102+
func withOnClose(onClose func()) resultOption {
103+
return func(s *streamResult) {
104+
s.onClose = append(s.onClose, onClose)
105+
}
106+
}
107+
101108
func onNextPartErr(callback func(err error)) resultOption {
102109
return func(s *streamResult) {
103110
s.onNextPartErr = append(s.onNextPartErr, callback)
@@ -115,22 +122,33 @@ func newResult(
115122
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
116123
opts ...resultOption,
117124
) (_ *streamResult, finalErr error) {
118-
r := streamResult{
119-
stream: stream,
120-
closed: make(chan struct{}),
121-
resultSetIndex: -1,
122-
}
123-
r.closeOnce = sync.OnceFunc(func() {
124-
close(r.closed)
125-
r.stream = nil
126-
})
125+
var (
126+
closed = make(chan struct{})
127+
r = streamResult{
128+
stream: stream,
129+
onClose: []func(){
130+
func() {
131+
close(closed)
132+
},
133+
},
134+
closed: closed,
135+
resultSetIndex: -1,
136+
}
137+
)
127138

128139
for _, opt := range opts {
129140
if opt != nil {
130141
opt(&r)
131142
}
132143
}
133144

145+
r.closeOnce = sync.OnceFunc(func() {
146+
for _, onClose := range r.onClose {
147+
onClose()
148+
}
149+
r.stream = nil
150+
})
151+
134152
if r.trace != nil {
135153
onDone := trace.QueryOnResultNew(r.trace, &ctx,
136154
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.newResult"),

0 commit comments

Comments
 (0)