Skip to content

Commit 09b942e

Browse files
committed
add waitgroup to spawner
1 parent c2d5e5f commit 09b942e

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

internal/pool/pool.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"golang.org/x/sync/errgroup"
@@ -62,6 +63,8 @@ type (
6263
stats *safeStats
6364

6465
spawnCancel context.CancelFunc
66+
67+
wg *sync.WaitGroup
6568
}
6669
option[PT Item[T], T any] func(p *Pool[PT, T])
6770
)
@@ -205,7 +208,9 @@ func New[PT Item[T], T any](
205208
}
206209

207210
var spawnCtx context.Context
211+
p.wg = &sync.WaitGroup{}
208212
spawnCtx, p.spawnCancel = xcontext.WithCancel(xcontext.ValueOnly(ctx))
213+
p.wg.Add(1)
209214
go p.spawnItems(spawnCtx)
210215

211216
return p
@@ -215,13 +220,17 @@ func New[PT Item[T], T any](
215220
// It ensures that pool would always have amount of connections equal to configured limit.
216221
// If item creation ended with error it will be retried infinity with configured interval until success.
217222
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
223+
defer p.wg.Done()
218224
for {
219225
select {
226+
case <-ctx.Done():
227+
return
220228
case <-p.done:
221229
return
222230
case <-p.itemTokens:
223231
// got token, must create item
224232
for {
233+
p.wg.Add(1)
225234
err := p.trySpawn(ctx)
226235
if err == nil {
227236
break
@@ -234,12 +243,15 @@ func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
234243
}
235244

236245
func (p *Pool[PT, T]) trySpawn(ctx context.Context) error {
246+
defer p.wg.Done()
237247
item, err := p.createItem(ctx)
238248
if err != nil {
239249
return err
240250
}
241251
// item was created successfully, put it in queue
242252
select {
253+
case <-ctx.Done():
254+
return ctx.Err()
243255
case <-p.done:
244256
return nil
245257
case p.queue <- item:
@@ -523,6 +535,9 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
523535
// Due to multiple senders queue is not closed here,
524536
// we're just making sure to drain it fully to close any existing item.
525537
close(p.done)
538+
539+
p.wg.Wait()
540+
526541
var g errgroup.Group
527542
shutdownLoop:
528543
for {

0 commit comments

Comments
 (0)