Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
dd5c6d1
macaroons: remove context.TODO() in tests
ellemouton Apr 7, 2025
6ff0c85
kvdb/etcd: remove context.TODO() from test helpers
ellemouton Apr 7, 2025
53ba229
lnd: pass context to `newServer` and `server.Start`
ellemouton Apr 7, 2025
7fd453f
discovery: thread context through to gossiper
ellemouton Apr 7, 2025
4a30d62
discovery: pass context through to reliable sender
ellemouton Apr 7, 2025
2a5235a
discovery: thread contexts to syncer
ellemouton Apr 7, 2025
69e9ce9
discovery: thread contexts through sync manager
ellemouton Apr 7, 2025
f4b7cc4
discovery: pass context to ProcessRemoteAnnouncement
ellemouton Apr 7, 2025
291381e
discovery: pass context through to bootstrapper SampleNodeAddrs
ellemouton Apr 8, 2025
6d03df7
discovery: remove unnecessary context.Background() calls
ellemouton Apr 9, 2025
3162837
discovery: listen on ctx in any select
ellemouton Apr 9, 2025
e2b5928
graph/db: test clean-up
ellemouton Apr 5, 2025
5e3d66a
graph/db: remove kvdb param from test helper
ellemouton Apr 5, 2025
b83b401
graph/db: remove kvdb.Backend from test helpers
ellemouton Apr 5, 2025
2f54a0a
graph/db: use only exported KVStore ForEachNode method in tests
ellemouton Apr 9, 2025
13fcb08
autopilot: start threading contexts through
ellemouton Apr 9, 2025
1c6c643
autopilot: continue threading context
ellemouton Apr 9, 2025
3f218df
autopilot: update AttachmentHeuristics with context
ellemouton Apr 9, 2025
cf1ec68
graph/db: remove unused Wipe method
ellemouton Mar 29, 2025
42e8739
graph/db: introduce ForEachSourceNodeChannel
ellemouton Mar 26, 2025
ea4b8c3
graph/db: unexport various methods that expose `kvdb.RTx`
ellemouton Mar 27, 2025
3ec4798
graph/db: use only exported KVStore methods in tests
ellemouton Apr 5, 2025
da3deee
multi: remove kvdb.RTx from ForEachNodeChannel
ellemouton Mar 26, 2025
c68a19c
discovery: revert passing ctx through to Start methods
ellemouton Apr 11, 2025
95d34be
autopilot: revert passing ctx to Start methods
ellemouton Apr 11, 2025
8468c7c
graph/db: let test alias be UTF-8 compatible
ellemouton Apr 5, 2025
258dd12
graph/db: update the `compareNodes` helper
ellemouton Apr 5, 2025
a5c1219
graph: test cleanup
ellemouton Mar 30, 2025
5d7c6c4
channeldb: remove graph calls from tests
ellemouton Apr 5, 2025
11b27f0
batch: dont expose kvdb.RwTx in batch.SchedulerOptions
ellemouton Mar 30, 2025
52018db
graph/db: introduce the V1Store interface
ellemouton Apr 5, 2025
449684d
graph/db: use V1Store interface in ChannelGraph
ellemouton Apr 5, 2025
526fb7f
graph/db: init KVStore outside of ChannelGraph
ellemouton Mar 30, 2025
0db9130
graph/db: make all ExtraOpaqueData valid TLV streams
ellemouton Apr 5, 2025
1410a09
lnwire: validate that gossip messages contain valid TLV
ellemouton May 7, 2025
227e49d
graph/db: expand TestNodeInsertionAndDeletion
ellemouton May 8, 2025
95220b7
graph/db: check for wrapped errors
ellemouton May 8, 2025
a9e6454
graph/db: set empty Features and ExtraOpaqueData in tests
ellemouton May 8, 2025
3dfc172
graph/db: use mainnet genisis hash in tests
ellemouton May 8, 2025
44a92b7
graph/db: add test coverage for AddEdgeProof
ellemouton May 8, 2025
4233649
graph/db: let MakeTestGraph require no error internally
ellemouton May 9, 2025
7288f28
routing+autopilot: rename mission control store var
ellemouton May 9, 2025
7a348e3
graph/db: let MakeTestGraph take ChanGraphOptions
ellemouton May 9, 2025
dc353dc
multi: use MakeTestGraph everywhere for test graph creation
ellemouton May 9, 2025
b285546
graph/db: add NewTestDB method
ellemouton May 9, 2025
3834536
sqldb+go.mod: update sqldb test methods
ellemouton Apr 5, 2025
b4121ac
graph/db: add a framework for testing against SQL backends incrementally
ellemouton Apr 5, 2025
be915f2
sqldb: add support for test migrations
ellemouton Apr 16, 2025
3bc58a0
graph/db: test the non-cached version of ForEachNodeDirectedChannel
ellemouton May 9, 2025
629e74a
graph/db: add various error assertions
ellemouton May 16, 2025
9b9a964
graph/db: more test coverage for node addresses
ellemouton May 19, 2025
f5a466b
batch+graph: update batch.Schedular to be generic
ellemouton May 21, 2025
bb95b0d
batch: add benchmark tests for KVDB batch writes
ellemouton May 21, 2025
e743878
batch: add a benchmark for SQLite and Postgres
ellemouton May 21, 2025
df1e6da
batch: update to allow for read-only calls
ellemouton May 22, 2025
7cef62f
docs: update release notes
ellemouton May 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ jobs:
- unit tags="kvdb_etcd"
- unit tags="kvdb_postgres"
- unit tags="kvdb_sqlite"
- unit tags="test_db_sqlite"
- unit tags="test_db_postgres"
- unit-race
- unit-module

Expand Down
34 changes: 23 additions & 11 deletions autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnwire"
)

Expand Down Expand Up @@ -166,8 +168,9 @@ type Agent struct {
pendingOpens map[NodeID]LocalChannel
pendingMtx sync.Mutex

quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
cancel fn.Option[context.CancelFunc]
}

// New creates a new instance of the Agent instantiated using the passed
Expand Down Expand Up @@ -202,17 +205,20 @@ func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
func (a *Agent) Start() error {
var err error
a.started.Do(func() {
err = a.start()
ctx, cancel := context.WithCancel(context.Background())
a.cancel = fn.Some(cancel)

err = a.start(ctx)
})
return err
}

func (a *Agent) start() error {
func (a *Agent) start(ctx context.Context) error {
rand.Seed(time.Now().Unix())
log.Infof("Autopilot Agent starting")

a.wg.Add(1)
go a.controller()
go a.controller(ctx)

return nil
}
Expand All @@ -230,6 +236,7 @@ func (a *Agent) Stop() error {
func (a *Agent) stop() error {
log.Infof("Autopilot Agent stopping")

a.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(a.quit)
a.wg.Wait()

Expand Down Expand Up @@ -401,7 +408,7 @@ func mergeChanState(pendingChans map[NodeID]LocalChannel,
// and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller() {
func (a *Agent) controller(ctx context.Context) {
defer a.wg.Done()

// We'll start off by assigning our starting balance, and injecting
Expand Down Expand Up @@ -502,6 +509,9 @@ func (a *Agent) controller() {
// immediately.
case <-a.quit:
return

case <-ctx.Done():
return
}

a.pendingMtx.Lock()
Expand Down Expand Up @@ -539,7 +549,7 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)

err := a.openChans(availableFunds, numChans, totalChans)
err := a.openChans(ctx, availableFunds, numChans, totalChans)
if err != nil {
log.Errorf("Unable to open channels: %v", err)
}
Expand All @@ -548,8 +558,8 @@ func (a *Agent) controller() {

// openChans queries the agent's heuristic for a set of channel candidates, and
// attempts to open channels to them.
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
totalChans []LocalChannel) error {
func (a *Agent) openChans(ctx context.Context, availableFunds btcutil.Amount,
numChans uint32, totalChans []LocalChannel) error {

// As channel size we'll use the maximum channel size available.
chanSize := a.cfg.Constraints.MaxChanSize()
Expand Down Expand Up @@ -598,7 +608,9 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
selfPubBytes := a.cfg.Self.SerializeCompressed()
nodes := make(map[NodeID]struct{})
addresses := make(map[NodeID][]net.Addr)
if err := a.cfg.Graph.ForEachNode(func(node Node) error {
if err := a.cfg.Graph.ForEachNode(ctx, func(_ context.Context,
node Node) error {

nID := NodeID(node.PubKey())

// If we come across ourselves, them we'll continue in
Expand Down Expand Up @@ -636,7 +648,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// graph.
log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
scores, err := a.cfg.Heuristic.NodeScores(
a.cfg.Graph, totalChans, chanSize, nodes,
ctx, a.cfg.Graph, totalChans, chanSize, nodes,
)
if err != nil {
return fmt.Errorf("unable to calculate node scores : %w", err)
Expand Down
7 changes: 4 additions & 3 deletions autopilot/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -85,9 +86,9 @@ func (m *mockHeuristic) Name() string {
return "mock"
}

func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (m *mockHeuristic) NodeScores(_ context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

if m.nodeScoresArgs != nil {
directive := directiveArg{
Expand Down
7 changes: 5 additions & 2 deletions autopilot/betweenness_centrality.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"sync"
)
Expand Down Expand Up @@ -168,8 +169,10 @@ func betweennessCentrality(g *SimpleGraph, s int, centrality []float64) {
}

// Refresh recalculates and stores centrality values.
func (bc *BetweennessCentrality) Refresh(graph ChannelGraph) error {
cache, err := NewSimpleGraph(graph)
func (bc *BetweennessCentrality) Refresh(ctx context.Context,
graph ChannelGraph) error {

cache, err := NewSimpleGraph(ctx, graph)
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions autopilot/betweenness_centrality_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -30,6 +31,9 @@ func TestBetweennessCentralityMetricConstruction(t *testing.T) {

// Tests that empty graph results in empty centrality result.
func TestBetweennessCentralityEmptyGraph(t *testing.T) {
t.Parallel()
ctx := context.Background()

centralityMetric, err := NewBetweennessCentralityMetric(1)
require.NoError(
t, err,
Expand All @@ -42,7 +46,7 @@ func TestBetweennessCentralityEmptyGraph(t *testing.T) {
require.NoError(t, err, "unable to create graph")

success := t.Run(chanGraph.name, func(t1 *testing.T) {
err = centralityMetric.Refresh(graph)
err = centralityMetric.Refresh(ctx, graph)
require.NoError(t1, err)

centrality := centralityMetric.GetMetric(false)
Expand All @@ -59,6 +63,9 @@ func TestBetweennessCentralityEmptyGraph(t *testing.T) {

// Test betweenness centrality calculating using an example graph.
func TestBetweennessCentralityWithNonEmptyGraph(t *testing.T) {
t.Parallel()
ctx := context.Background()

workers := []int{1, 3, 9, 100}

tests := []struct {
Expand Down Expand Up @@ -100,7 +107,7 @@ func TestBetweennessCentralityWithNonEmptyGraph(t *testing.T) {
t1, graph, centralityTestGraph,
)

err = metric.Refresh(graph)
err = metric.Refresh(ctx, graph)
require.NoError(t1, err)

for _, expected := range tests {
Expand Down
9 changes: 5 additions & 4 deletions autopilot/combinedattach.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"

"github.com/btcsuite/btcd/btcutil"
Expand Down Expand Up @@ -70,9 +71,9 @@ func (c *WeightedCombAttachment) Name() string {
// is the maximum possible improvement in connectivity.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (c *WeightedCombAttachment) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (c *WeightedCombAttachment) NodeScores(ctx context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

// We now query each heuristic to determine the score they give to the
// nodes for the given channel size.
Expand All @@ -81,7 +82,7 @@ func (c *WeightedCombAttachment) NodeScores(g ChannelGraph, chans []LocalChannel
log.Tracef("Getting scores from sub heuristic %v", h.Name())

s, err := h.NodeScores(
g, chans, chanSize, nodes,
ctx, g, chans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %w",
Expand Down
7 changes: 4 additions & 3 deletions autopilot/externalscoreattach.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -80,9 +81,9 @@ func (s *ExternalScoreAttachment) SetNodeScores(targetHeuristic string,
// not known will get a score of 0.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (s *ExternalScoreAttachment) NodeScores(g ChannelGraph, chans []LocalChannel,
chanSize btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*NodeScore, error) {
func (s *ExternalScoreAttachment) NodeScores(_ context.Context, g ChannelGraph,
chans []LocalChannel, chanSize btcutil.Amount,
nodes map[NodeID]struct{}) (map[NodeID]*NodeScore, error) {

existingPeers := make(map[NodeID]struct{})
for _, c := range chans {
Expand Down
4 changes: 3 additions & 1 deletion autopilot/externalscoreattach_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot_test

import (
"context"
"testing"

"github.com/btcsuite/btcd/btcec/v2"
Expand All @@ -22,6 +23,7 @@ func randKey() (*btcec.PublicKey, error) {
// ExternalScoreAttachment correctly reflects the scores we set last.
func TestSetNodeScores(t *testing.T) {
t.Parallel()
ctx := context.Background()

const name = "externalscore"

Expand Down Expand Up @@ -62,7 +64,7 @@ func TestSetNodeScores(t *testing.T) {
q[nID] = struct{}{}
}
resp, err := h.NodeScores(
nil, nil, btcutil.Amount(btcutil.SatoshiPerBitcoin), q,
ctx, nil, nil, btcutil.Amount(btcutil.SatoshiPerBitcoin), q,
)
if err != nil {
t.Fatal(err)
Expand Down
33 changes: 23 additions & 10 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"encoding/hex"
"net"
"sort"
Expand Down Expand Up @@ -80,7 +81,9 @@ func (d *dbNode) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
func (d *dbNode) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

return d.tx.ForEachChannel(func(ei *models.ChannelEdgeInfo, ep,
_ *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -108,7 +111,7 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
},
}

return cb(edge)
return cb(ctx, edge)
})
}

Expand All @@ -117,7 +120,9 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
func (d *databaseChannelGraph) ForEachNode(ctx context.Context,
cb func(context.Context, Node) error) error {

return d.db.ForEachNode(func(nodeTx graphdb.NodeRTx) error {
// We'll skip over any node that doesn't have any advertised
// addresses. As we won't be able to reach them to actually
Expand All @@ -129,7 +134,8 @@ func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
node := &dbNode{
tx: nodeTx,
}
return cb(node)

return cb(ctx, node)
})
}

Expand Down Expand Up @@ -185,7 +191,9 @@ func (nc dbNodeCached) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
func (nc dbNodeCached) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

for cid, channel := range nc.channels {
edge := ChannelEdge{
ChanID: lnwire.NewShortChanIDFromInt(cid),
Expand All @@ -195,7 +203,7 @@ func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
},
}

if err := cb(edge); err != nil {
if err := cb(ctx, edge); err != nil {
return err
}
}
Expand All @@ -208,7 +216,9 @@ func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (dc *databaseChannelGraphCached) ForEachNode(cb func(Node) error) error {
func (dc *databaseChannelGraphCached) ForEachNode(ctx context.Context,
cb func(context.Context, Node) error) error {

return dc.db.ForEachNodeCached(func(n route.Vertex,
channels map[uint64]*graphdb.DirectedChannel) error {

Expand All @@ -217,7 +227,8 @@ func (dc *databaseChannelGraphCached) ForEachNode(cb func(Node) error) error {
node: n,
channels: channels,
}
return cb(node)

return cb(ctx, node)
}
return nil
})
Expand Down Expand Up @@ -262,9 +273,11 @@ func (m memNode) Addrs() []net.Addr {
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (m memNode) ForEachChannel(cb func(ChannelEdge) error) error {
func (m memNode) ForEachChannel(ctx context.Context,
cb func(context.Context, ChannelEdge) error) error {

for _, channel := range m.chans {
if err := cb(channel); err != nil {
if err := cb(ctx, channel); err != nil {
return err
}
}
Expand Down
Loading
Loading