Skip to content

Commit 7a18b60

Browse files
authored
Merge pull request #9845 from ellemouton/genericBatchTxScheduler
batch: generic scheduler + benchmark tests
2 parents c654416 + 2fd3297 commit 7a18b60

File tree

6 files changed

+791
-59
lines changed

6 files changed

+791
-59
lines changed

batch/batch.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package batch
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67

7-
"github.com/lightningnetwork/lnd/kvdb"
88
"github.com/lightningnetwork/lnd/sqldb"
99
)
1010

@@ -14,28 +14,44 @@ var errSolo = errors.New(
1414
"batch function returned an error and should be re-run solo",
1515
)
1616

17-
type request struct {
18-
*Request
17+
// txOpts implements the sqldb.TxOptions interface. It is used to indicate that
18+
// the transaction can be read-only or not transaction.
19+
type txOpts struct {
20+
readOnly bool
21+
}
22+
23+
// ReadOnly returns true if the transaction should be read only.
24+
//
25+
// NOTE: This is part of the sqldb.TxOptions interface.
26+
func (t *txOpts) ReadOnly() bool {
27+
return t.readOnly
28+
}
29+
30+
type request[Q any] struct {
31+
*Request[Q]
1932
errChan chan error
2033
}
2134

22-
type batch struct {
23-
db kvdb.Backend
35+
type batch[Q any] struct {
36+
db sqldb.BatchedTx[Q]
2437
start sync.Once
25-
reqs []*request
26-
clear func(b *batch)
38+
reqs []*request[Q]
39+
clear func(b *batch[Q])
2740
locker sync.Locker
41+
txOpts txOpts
2842
}
2943

3044
// trigger is the entry point for the batch and ensures that run is started at
3145
// most once.
32-
func (b *batch) trigger() {
33-
b.start.Do(b.run)
46+
func (b *batch[Q]) trigger(ctx context.Context) {
47+
b.start.Do(func() {
48+
b.run(ctx)
49+
})
3450
}
3551

3652
// run executes the current batch of requests. If any individual requests fail
3753
// alongside others they will be retried by the caller.
38-
func (b *batch) run() {
54+
func (b *batch[Q]) run(ctx context.Context) {
3955
// Clear the batch from its scheduler, ensuring that no new requests are
4056
// added to this batch.
4157
b.clear(b)
@@ -52,9 +68,9 @@ func (b *batch) run() {
5268
// that fail will be retried individually.
5369
for len(b.reqs) > 0 {
5470
var failIdx = -1
55-
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
71+
err := b.db.ExecTx(ctx, &b.txOpts, func(tx Q) error {
5672
for i, req := range b.reqs {
57-
err := req.Update(tx)
73+
err := req.Do(tx)
5874
if err != nil {
5975
// If we get a serialization error, we
6076
// want the underlying SQL retry

0 commit comments

Comments
 (0)