diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index 8d785a81a75..4e1bbf7b1d3 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -488,7 +488,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, Capacity: capacity, } edge.AddNodeKeys(lnNode1, lnNode2, lnNode1, lnNode2) - if err := d.db.AddChannelEdge(edge); err != nil { + if err := d.db.AddChannelEdge(ctx, edge); err != nil { return nil, nil, err } edgePolicy := &models.ChannelEdgePolicy{ @@ -504,7 +504,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, ChannelFlags: 0, } - if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, nil, err } edgePolicy = &models.ChannelEdgePolicy{ @@ -519,7 +519,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, MessageFlags: 1, ChannelFlags: 1, } - if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, nil, err } diff --git a/discovery/chan_series.go b/discovery/chan_series.go index 696a908c4b1..a6787edf969 100644 --- a/discovery/chan_series.go +++ b/discovery/chan_series.go @@ -1,6 +1,7 @@ package discovery import ( + "context" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -22,7 +23,8 @@ type ChannelGraphTimeSeries interface { // height that's close to the current tip of the main chain as we // know it. We'll use this to start our QueryChannelRange dance with // the remote node. - HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) + HighestChanID(ctx context.Context, + chain chainhash.Hash) (*lnwire.ShortChannelID, error) // UpdatesInHorizon returns all known channel and node updates with an // update timestamp between the start time and end time. We'll use this @@ -87,8 +89,10 @@ func NewChanSeries(graph *graphdb.ChannelGraph) *ChanSeries { // this to start our QueryChannelRange dance with the remote node. // // NOTE: This is part of the ChannelGraphTimeSeries interface. -func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) { - chanID, err := c.graph.HighestChanID() +func (c *ChanSeries) HighestChanID(ctx context.Context, + _ chainhash.Hash) (*lnwire.ShortChannelID, error) { + + chanID, err := c.graph.HighestChanID(ctx) if err != nil { return nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 2f390da5b7a..18b603e1be3 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2288,7 +2288,7 @@ func (d *AuthenticatedGossiper) isMsgStale(_ context.Context, // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. -func (d *AuthenticatedGossiper) updateChannel(_ context.Context, +func (d *AuthenticatedGossiper) updateChannel(ctx context.Context, info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1, *lnwire.ChannelUpdate1, error) { @@ -2322,7 +2322,7 @@ func (d *AuthenticatedGossiper) updateChannel(_ context.Context, } // Finally, we'll write the new edge policy to disk. - if err := d.cfg.Graph.UpdateEdge(edge); err != nil { + if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil { return nil, nil, err } @@ -2808,7 +2808,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context, // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be // added to represent each node while we wait for a node announcement. - err = d.cfg.Graph.AddEdge(edge, ops...) + err = d.cfg.Graph.AddEdge(ctx, edge, ops...) if err != nil { log.Debugf("Graph rejected edge for short_chan_id(%v): %v", scid.ToUint64(), err) @@ -3263,7 +3263,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context, ExtraOpaqueData: upd.ExtraOpaqueData, } - if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil { + if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil { if graph.IsError( err, graph.ErrOutdated, graph.ErrIgnored, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index fb219f41c7b..dd8eca40dc4 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -136,8 +136,8 @@ func (r *mockGraphSource) IsZombieEdge(chanID lnwire.ShortChannelID) (bool, return ok, nil } -func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo, - _ ...batch.SchedulerOption) error { +func (r *mockGraphSource) AddEdge(_ context.Context, + info *models.ChannelEdgeInfo, _ ...batch.SchedulerOption) error { r.mu.Lock() defer r.mu.Unlock() @@ -161,8 +161,8 @@ func (r *mockGraphSource) queueValidationFail(chanID uint64) { r.chansToReject[chanID] = struct{}{} } -func (r *mockGraphSource) UpdateEdge(edge *models.ChannelEdgePolicy, - _ ...batch.SchedulerOption) error { +func (r *mockGraphSource) UpdateEdge(_ context.Context, + edge *models.ChannelEdgePolicy, _ ...batch.SchedulerOption) error { r.mu.Lock() defer func() { diff --git a/discovery/syncer.go b/discovery/syncer.go index ccea2dc292e..0ebfac4c211 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -965,12 +965,14 @@ func (g *GossipSyncer) processChanRangeReply(_ context.Context, // party when we're kicking off the channel graph synchronization upon // connection. The historicalQuery boolean can be used to generate a query from // the genesis block of the chain. -func (g *GossipSyncer) genChanRangeQuery(_ context.Context, +func (g *GossipSyncer) genChanRangeQuery(ctx context.Context, historicalQuery bool) (*lnwire.QueryChannelRange, error) { // First, we'll query our channel graph time series for its highest // known channel ID. - newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash) + newestChan, err := g.cfg.channelSeries.HighestChanID( + ctx, g.cfg.chainHash, + ) if err != nil { return nil, err } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 44e8d6d701b..5d5e82ef5ed 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -79,9 +79,12 @@ func newMockChannelGraphTimeSeries( } } -func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) { +func (m *mockChannelGraphTimeSeries) HighestChanID(_ context.Context, + _ chainhash.Hash) (*lnwire.ShortChannelID, error) { + return &m.highestID, nil } + func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash, startTime time.Time, endTime time.Time) ([]lnwire.Message, error) { diff --git a/graph/builder.go b/graph/builder.go index ce8dff17936..028e1c63132 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -915,6 +915,8 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error { // ApplyChannelUpdate validates a channel update and if valid, applies it to the // database. It returns a bool indicating whether the updates were successful. func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { + ctx := context.TODO() + ch, _, _, err := b.GetChannelByID(msg.ShortChannelID) if err != nil { log.Errorf("Unable to retrieve channel by id: %v", err) @@ -959,7 +961,7 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { ExtraOpaqueData: msg.ExtraOpaqueData, } - err = b.UpdateEdge(update) + err = b.UpdateEdge(ctx, update) if err != nil && !IsError(err, ErrIgnored, ErrOutdated) { log.Errorf("Unable to apply channel update: %v", err) return false @@ -1017,10 +1019,10 @@ func (b *Builder) addNode(ctx context.Context, node *models.LightningNode, // in construction of payment path. // // NOTE: This method is part of the ChannelGraphSource interface. -func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, +func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - err := b.addEdge(edge, op...) + err := b.addEdge(ctx, edge, op...) if err != nil { logNetworkMsgProcessError(err) @@ -1038,7 +1040,7 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, // // TODO(elle): this currently also does funding-transaction validation. But this // should be moved to the gossiper instead. -func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, +func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID) @@ -1061,7 +1063,7 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, edge.ChannelID) } - if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil { + if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil { return fmt.Errorf("unable to add edge: %w", err) } @@ -1118,10 +1120,10 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, // considered as not fully constructed. // // NOTE: This method is part of the ChannelGraphSource interface. -func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (b *Builder) UpdateEdge(ctx context.Context, + update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - err := b.updateEdge(update, op...) + err := b.updateEdge(ctx, update, op...) if err != nil { logNetworkMsgProcessError(err) @@ -1135,8 +1137,8 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, // persisted in the graph, and then applies it to the graph if the update is // considered fresh enough and if we actually have a channel persisted for the // given update. -func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (b *Builder) updateEdge(ctx context.Context, + policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { log.Debugf("Received ChannelEdgePolicy for channel %v", policy.ChannelID) @@ -1209,7 +1211,7 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, // Now that we know this isn't a stale update, we'll apply the new edge // policy to the proper directional edge within the channel graph. - if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil { + if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil { err := errors.Errorf("unable to add channel: %v", err) log.Error(err) return err diff --git a/graph/builder_test.go b/graph/builder_test.go index 8795fc85768..3abb345e003 100644 --- a/graph/builder_test.go +++ b/graph/builder_test.go @@ -44,6 +44,7 @@ const ( // info was added to the database. func TestAddProof(t *testing.T) { t.Parallel() + ctxb := context.Background() ctx := createTestCtxSingleNode(t, 0) @@ -75,7 +76,7 @@ func TestAddProof(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - require.NoError(t, ctx.builder.AddEdge(edge)) + require.NoError(t, ctx.builder.AddEdge(ctxb, edge)) // Now we'll attempt to update the proof and check that it has been // properly updated. @@ -117,6 +118,7 @@ func TestIgnoreNodeAnnouncement(t *testing.T) { // ignore a channel policy for a channel not in the graph. func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 @@ -170,18 +172,18 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { // Attempt to update the edge. This should be ignored, since the edge // is not yet added to the router. - err = ctx.builder.UpdateEdge(edgePolicy) + err = ctx.builder.UpdateEdge(ctxb, edgePolicy) if !IsError(err, ErrIgnored) { t.Fatalf("expected to get ErrIgnore, instead got: %v", err) } // Add the edge. - require.NoErrorf(t, ctx.builder.AddEdge(edge), "expected to be able "+ - "to add edge to the channel graph, even though the vertexes "+ - "were unknown: %v.", err) + require.NoErrorf(t, ctx.builder.AddEdge(ctxb, edge), + "expected to be able to add edge to the channel graph, even "+ + "though the vertexes were unknown: %v.", err) // Now updating the edge policy should succeed. - require.NoError(t, ctx.builder.UpdateEdge(edgePolicy)) + require.NoError(t, ctx.builder.UpdateEdge(ctxb, edgePolicy)) } // TestWakeUpOnStaleBranch tests that upon startup of the ChannelRouter, if the @@ -190,6 +192,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { // confirmed on the stale chain, and resync to the main chain. func TestWakeUpOnStaleBranch(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -283,7 +286,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -302,7 +305,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { copy(edge2.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge2.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge2); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -347,7 +350,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { // Give time to process new blocks. time.Sleep(time.Millisecond * 500) - selfNode, err := ctx.graph.SourceNode() + selfNode, err := ctx.graph.SourceNode(context.Background()) require.NoError(t, err) // Create new router with same graph database. @@ -401,6 +404,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { // it is active. func TestDisconnectedBlocks(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -492,7 +496,7 @@ func TestDisconnectedBlocks(t *testing.T) { copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -513,7 +517,7 @@ func TestDisconnectedBlocks(t *testing.T) { copy(edge2.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge2.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge2); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -599,6 +603,7 @@ func TestDisconnectedBlocks(t *testing.T) { // ChannelRouter, then the channels are properly pruned. func TestChansClosedOfflinePruneGraph(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -644,7 +649,7 @@ func TestChansClosedOfflinePruneGraph(t *testing.T) { } copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1047,7 +1052,7 @@ func TestIsStaleNode(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1092,6 +1097,7 @@ func TestIsStaleNode(t *testing.T) { // channel announcements. func TestIsKnownEdge(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -1125,7 +1131,7 @@ func TestIsKnownEdge(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1140,6 +1146,7 @@ func TestIsKnownEdge(t *testing.T) { // stale channel edge update announcements. func TestIsStaleEdgePolicy(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxFromFile(t, startingBlockHeight, basicGraphFilePath) @@ -1183,7 +1190,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1198,7 +1205,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { FeeProportionalMillionths: 10000, } edgePolicy.ChannelFlags = 0 - if err := ctx.builder.UpdateEdge(edgePolicy); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edgePolicy); err != nil { t.Fatalf("unable to update edge policy: %v", err) } @@ -1212,7 +1219,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { FeeProportionalMillionths: 10000, } edgePolicy.ChannelFlags = 1 - if err := ctx.builder.UpdateEdge(edgePolicy); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edgePolicy); err != nil { t.Fatalf("unable to update edge policy: %v", err) } @@ -1445,7 +1452,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( if source != nil { // Set the selected source node - if err := graph.SetSourceNode(source); err != nil { + if err := graph.SetSourceNode(ctx, source); err != nil { return nil, err } } @@ -1501,7 +1508,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } @@ -1536,7 +1543,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), ToNode: targetNode, } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, err } @@ -1785,7 +1792,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, return nil, err } - if err = graph.SetSourceNode(dbNode); err != nil { + if err = graph.SetSourceNode(ctx, dbNode); err != nil { return nil, err } @@ -1861,7 +1868,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, BitcoinKey2Bytes: node2Vertex, } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { @@ -1905,7 +1912,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, ToNode: node2Vertex, ExtraOpaqueData: getExtraData(node1), } - err := graph.UpdateEdgePolicy(edgePolicy) + err := graph.UpdateEdgePolicy(ctx, edgePolicy) if err != nil { return nil, err } @@ -1936,7 +1943,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, ToNode: node1Vertex, ExtraOpaqueData: getExtraData(node2), } - err := graph.UpdateEdgePolicy(edgePolicy) + err := graph.UpdateEdgePolicy(ctx, edgePolicy) if err != nil { return nil, err } diff --git a/graph/db/graph.go b/graph/db/graph.go index fb2a9975a1a..7150107a286 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -307,10 +307,10 @@ func (c *ChannelGraph) DeleteLightningNode(ctx context.Context, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, - op ...batch.SchedulerOption) error { +func (c *ChannelGraph) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - err := c.V1Store.AddChannelEdge(edge, op...) + err := c.V1Store.AddChannelEdge(ctx, edge, op...) if err != nil { return err } @@ -565,10 +565,10 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (c *ChannelGraph) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - from, to, err := c.V1Store.UpdateEdgePolicy(edge, op...) + from, to, err := c.V1Store.UpdateEdgePolicy(ctx, edge, op...) if err != nil { return err } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 789a4c0fb6a..dbd828527e7 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -276,7 +276,7 @@ func TestPartialNode(t *testing.T) { // Create an edge attached to these nodes and add it to the graph. edgeInfo, _ := createEdge(140, 0, 0, 0, &node1, &node2) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) // Both of the nodes should now be in both the graph (as partial/shell) // nodes _and_ the cache should also have an awareness of both nodes. @@ -348,7 +348,7 @@ func TestAliasLookup(t *testing.T) { // the one which the test node was assigned. nodePub, err := testNode.PubKey() require.NoError(t, err, "unable to generate pubkey") - dbAlias, err := graph.LookupAlias(nodePub) + dbAlias, err := graph.LookupAlias(ctx, nodePub) require.NoError(t, err, "unable to find alias") require.Equal(t, testNode.Alias, dbAlias) @@ -356,13 +356,14 @@ func TestAliasLookup(t *testing.T) { node := createTestVertex(t) nodePub, err = node.PubKey() require.NoError(t, err, "unable to generate pubkey") - _, err = graph.LookupAlias(nodePub) + _, err = graph.LookupAlias(ctx, nodePub) require.ErrorIs(t, err, ErrNodeAliasNotFound) } // TestSourceNode tests the source node functionality of the graph store. func TestSourceNode(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraphNew(t) @@ -372,16 +373,16 @@ func TestSourceNode(t *testing.T) { // Attempt to fetch the source node, this should return an error as the // source node hasn't yet been set. - _, err := graph.SourceNode() + _, err := graph.SourceNode(ctx) require.ErrorIs(t, err, ErrSourceNodeNotSet) // Set the source node, this should insert the node into the // database in a special way indicating it's the source node. - require.NoError(t, graph.SetSourceNode(testNode)) + require.NoError(t, graph.SetSourceNode(ctx, testNode)) // Retrieve the source node from the database, it should exactly match // the one we set above. - sourceNode, err := graph.SourceNode() + sourceNode, err := graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") compareNodes(t, testNode, sourceNode) } @@ -389,6 +390,7 @@ func TestSourceNode(t *testing.T) { // TestEdgeInsertionDeletion tests the basic CRUD operations for channel edges. func TestEdgeInsertionDeletion(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) @@ -428,12 +430,12 @@ func TestEdgeInsertionDeletion(t *testing.T) { copy(edgeInfo.BitcoinKey1Bytes[:], node1Pub.SerializeCompressed()) copy(edgeInfo.BitcoinKey2Bytes[:], node2Pub.SerializeCompressed()) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) assertEdgeWithNoPoliciesInCache(t, graph, &edgeInfo) // Show that trying to insert the same channel again will return the // expected error. - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) require.ErrorIs(t, err, ErrEdgeAlreadyExist) // Ensure that both policies are returned as unknown (nil). @@ -511,11 +513,12 @@ func createEdge(height, txIndex uint32, txPosition uint16, outPointIndex uint32, // database is what we expect after calling DisconnectBlockAtHeight. func TestDisconnectBlockAtHeight(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) sourceNode := createTestVertex(t) - if err := graph.SetSourceNode(sourceNode); err != nil { + if err := graph.SetSourceNode(ctx, sourceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -560,15 +563,15 @@ func TestDisconnectBlockAtHeight(t *testing.T) { edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2) // Now add all these new edges to the database. - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edgeInfo2); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo2); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edgeInfo3); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo3); err != nil { t.Fatalf("unable to create channel edge: %v", err) } assertEdgeWithNoPoliciesInCache(t, graph, &edgeInfo) @@ -829,12 +832,12 @@ func TestEdgeInfoUpdates(t *testing.T) { // Make sure inserting the policy at this point, before the edge info // is added, will fail. - err := graph.UpdateEdgePolicy(edge1) + err := graph.UpdateEdgePolicy(ctx, edge1) require.ErrorIs(t, err, ErrEdgeNotFound) require.Len(t, graph.graphCache.nodeChannels, 0) // Add the edge info. - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } assertEdgeWithNoPoliciesInCache(t, graph, edgeInfo) @@ -844,11 +847,11 @@ func TestEdgeInfoUpdates(t *testing.T) { // Next, insert both edge policies into the database, they should both // be inserted without any issues. - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } assertEdgeWithPolicyInCache(t, graph, edgeInfo, edge1, true) - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } assertEdgeWithPolicyInCache(t, graph, edgeInfo, edge2, false) @@ -1125,6 +1128,7 @@ func newEdgePolicy(chanID uint64, updateTime int64) *models.ChannelEdgePolicy { // TestAddEdgeProof tests the ability to add an edge proof to an existing edge. func TestAddEdgeProof(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) @@ -1132,7 +1136,7 @@ func TestAddEdgeProof(t *testing.T) { node1 := createTestVertex(t) node2 := createTestVertex(t) edge1, _, _ := createChannelEdge(node1, node2, withSkipProofs()) - require.NoError(t, graph.AddChannelEdge(edge1)) + require.NoError(t, graph.AddChannelEdge(ctx, edge1)) // Fetch the edge and assert that the proof is nil and that the rest // of the edge info is correct. @@ -1153,7 +1157,7 @@ func TestAddEdgeProof(t *testing.T) { // to call AddChannelEdge again - this should fail due to the channel // already existing. edge1.AuthProof = proof - err = graph.AddChannelEdge(edge1) + err = graph.AddChannelEdge(ctx, edge1) require.Error(t, err, ErrEdgeAlreadyExist) // Now add just the proof. @@ -1169,7 +1173,7 @@ func TestAddEdgeProof(t *testing.T) { // For completeness, also test the case where we insert a new edge with // an edge proof. Show that the proof is present from the get go. edge2, _, _ := createChannelEdge(node1, node2) - require.NoError(t, graph.AddChannelEdge(edge2)) + require.NoError(t, graph.AddChannelEdge(ctx, edge2)) // Fetch the edge and assert that the proof is nil and that the rest // of the edge info is correct. @@ -1183,12 +1187,13 @@ func TestAddEdgeProof(t *testing.T) { // correctly iterates through the channels of the set source node. func TestForEachSourceNodeChannel(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraphNew(t) // Create a source node (A) and set it as such in the DB. nodeA := createTestVertex(t) - require.NoError(t, graph.SetSourceNode(nodeA)) + require.NoError(t, graph.SetSourceNode(ctx, nodeA)) // Now, create a few more nodes (B, C, D) along with some channels // between them. We'll create the following graph: @@ -1208,11 +1213,11 @@ func TestForEachSourceNodeChannel(t *testing.T) { nodeD := createTestVertex(t) abEdge, abPolicy1, abPolicy2 := createChannelEdge(nodeA, nodeB) - require.NoError(t, graph.AddChannelEdge(abEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, abEdge)) acEdge, acPolicy1, acPolicy2 := createChannelEdge(nodeA, nodeC) - require.NoError(t, graph.AddChannelEdge(acEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, acEdge)) bdEdge, _, _ := createChannelEdge(nodeB, nodeD) - require.NoError(t, graph.AddChannelEdge(bdEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, bdEdge)) // Figure out which of the policies returned above are node A's so that // we know which to persist. @@ -1222,14 +1227,14 @@ func TestForEachSourceNodeChannel(t *testing.T) { if !bytes.Equal(abPolicy1.ToNode[:], nodeB.PubKeyBytes[:]) { abPolicyAOutgoing = abPolicy2 } - require.NoError(t, graph.UpdateEdgePolicy(abPolicyAOutgoing)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, abPolicyAOutgoing)) // Now, set the incoming policy for the A-C channel. acPolicyAIncoming := acPolicy1 if !bytes.Equal(acPolicy1.ToNode[:], nodeA.PubKeyBytes[:]) { acPolicyAIncoming = acPolicy2 } - require.NoError(t, graph.UpdateEdgePolicy(acPolicyAIncoming)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, acPolicyAIncoming)) type sourceNodeChan struct { otherNode route.Vertex @@ -1552,7 +1557,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, copy(edgeInfo.NodeKey2Bytes[:], node2.PubKeyBytes[:]) copy(edgeInfo.BitcoinKey1Bytes[:], node1.PubKeyBytes[:]) copy(edgeInfo.BitcoinKey2Bytes[:], node2.PubKeyBytes[:]) - err := graph.AddChannelEdge(&edgeInfo) + err := graph.AddChannelEdge(ctx, &edgeInfo) require.NoError(t, err) // Create and add an edge with random data that points @@ -1561,7 +1566,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, edge.ChannelFlags = 0 edge.ToNode = node2.PubKeyBytes edge.SigBytes = testSig.Serialize() - require.NoError(t, graph.UpdateEdgePolicy(edge)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge)) // Create another random edge that points from // node2 -> node1 this time. @@ -1569,7 +1574,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, edge.ChannelFlags = 1 edge.ToNode = node1.PubKeyBytes edge.SigBytes = testSig.Serialize() - require.NoError(t, graph.UpdateEdgePolicy(edge)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge)) chanIndex[chanID] = struct{}{} } @@ -1685,7 +1690,7 @@ func TestGraphPruning(t *testing.T) { graph := MakeTestGraph(t) sourceNode := createTestVertex(t) - if err := graph.SetSourceNode(sourceNode); err != nil { + if err := graph.SetSourceNode(ctx, sourceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -1737,7 +1742,7 @@ func TestGraphPruning(t *testing.T) { edgeInfo.BitcoinKey2Bytes[:], graphNodes[i+1].PubKeyBytes[:], ) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add node: %v", err) } @@ -1759,7 +1764,7 @@ func TestGraphPruning(t *testing.T) { edge.ChannelFlags = 0 edge.ToNode = graphNodes[i].PubKeyBytes edge.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -1769,7 +1774,7 @@ func TestGraphPruning(t *testing.T) { edge.ChannelFlags = 1 edge.ToNode = graphNodes[i].PubKeyBytes edge.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge); err != nil { t.Fatalf("unable to update edge: %v", err) } } @@ -1870,12 +1875,13 @@ func TestGraphPruning(t *testing.T) { // known channel ID in the database. func TestHighestChanID(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraphNew(t) // If we don't yet have any channels in the database, then we should // get a channel ID of zero if we ask for the highest channel ID. - bestID, err := graph.HighestChanID() + bestID, err := graph.HighestChanID(ctx) require.NoError(t, err, "unable to get highest ID") if bestID != 0 { t.Fatalf("best ID w/ no chan should be zero, is instead: %v", @@ -1892,16 +1898,16 @@ func TestHighestChanID(t *testing.T) { edge1, _ := createEdge(10, 0, 0, 0, node1, node2) edge2, chanID2 := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edge1); err != nil { + if err := graph.AddChannelEdge(ctx, &edge1); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edge2); err != nil { + if err := graph.AddChannelEdge(ctx, &edge2); err != nil { t.Fatalf("unable to create channel edge: %v", err) } // Now that the edges has been inserted, we'll query for the highest // known channel ID in the database. - bestID, err = graph.HighestChanID() + bestID, err = graph.HighestChanID(ctx) require.NoError(t, err, "unable to get highest ID") if bestID != chanID2.ToUint64() { @@ -1912,10 +1918,10 @@ func TestHighestChanID(t *testing.T) { // If we add another edge, then the current best chan ID should be // updated as well. edge3, chanID3 := createEdge(1000, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edge3); err != nil { + if err := graph.AddChannelEdge(ctx, &edge3); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - bestID, err = graph.HighestChanID() + bestID, err = graph.HighestChanID(ctx) require.NoError(t, err, "unable to get highest ID") if bestID != chanID3.ToUint64() { @@ -1965,7 +1971,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -1979,7 +1985,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node2.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -1989,7 +1995,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node1.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2306,7 +2312,7 @@ func TestFilterKnownChanIDs(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -2321,7 +2327,7 @@ func TestFilterKnownChanIDs(t *testing.T) { channel, chanID := createEdge( uint32(i*10+1), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } err := graph.DeleteChannelEdges(false, true, channel.ChannelID) @@ -2444,7 +2450,7 @@ func TestStressTestChannelGraphAPI(t *testing.T) { node2 := createTestVertex(t) require.NoError(t, graph.AddLightningNode(ctx, node2)) - require.NoError(t, graph.SetSourceNode(node1)) + require.NoError(t, graph.SetSourceNode(ctx, node1)) type chanInfo struct { info models.ChannelEdgeInfo @@ -2678,7 +2684,7 @@ func TestStressTestChannelGraphAPI(t *testing.T) { fn: func() error { channel := addNewChan() - return graph.AddChannelEdge(&channel.info) + return graph.AddChannelEdge(ctx, &channel.info) }, }, } @@ -2764,12 +2770,14 @@ func TestFilterChannelRange(t *testing.T) { var updateTime = time.Unix(0, 0) if rand.Int31n(2) == 0 { updateTime = time.Unix(updateTimeSeed, 0) - err = graph.UpdateEdgePolicy(&models.ChannelEdgePolicy{ - ToNode: node.PubKeyBytes, - ChannelFlags: chanFlags, - ChannelID: chanID, - LastUpdate: updateTime, - }) + err = graph.UpdateEdgePolicy( + ctx, &models.ChannelEdgePolicy{ + ToNode: node.PubKeyBytes, + ChannelFlags: chanFlags, + ChannelID: chanID, + LastUpdate: updateTime, + }, + ) require.NoError(t, err) } updateTimeSeed++ @@ -2782,12 +2790,12 @@ func TestFilterChannelRange(t *testing.T) { channel1, chanID1 := createEdge( chanHeight, uint32(i+1), 0, 0, node1, node2, ) - require.NoError(t, graph.AddChannelEdge(&channel1)) + require.NoError(t, graph.AddChannelEdge(ctx, &channel1)) channel2, chanID2 := createEdge( chanHeight, uint32(i+2), 0, 0, node1, node2, ) - require.NoError(t, graph.AddChannelEdge(&channel2)) + require.NoError(t, graph.AddChannelEdge(ctx, &channel2)) chanInfo1 := NewChannelUpdateInfo( chanID1, time.Time{}, time.Time{}, @@ -2963,7 +2971,7 @@ func TestFetchChanInfos(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -2974,7 +2982,7 @@ func TestFetchChanInfos(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node2.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2982,7 +2990,7 @@ func TestFetchChanInfos(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node1.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3004,7 +3012,7 @@ func TestFetchChanInfos(t *testing.T) { zombieChan, zombieChanID := createEdge( 666, 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&zombieChan); err != nil { + if err := graph.AddChannelEdge(ctx, &zombieChan); err != nil { t.Fatalf("unable to create channel edge: %v", err) } err := graph.DeleteChannelEdges(false, true, zombieChan.ChannelID) @@ -3057,7 +3065,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { uint32(0), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3109,7 +3117,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { edgePolicy.ChannelFlags = 0 edgePolicy.ToNode = node2.PubKeyBytes edgePolicy.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3122,7 +3130,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { edgePolicy.ChannelFlags = 1 edgePolicy.ToNode = node1.PubKeyBytes edgePolicy.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3146,7 +3154,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { } sourceNode := createTestVertex(t) - if err := graph.SetSourceNode(sourceNode); err != nil { + if err := graph.SetSourceNode(ctx, sourceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -3164,7 +3172,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { // With the two nodes created, we'll now create a random channel, as // well as two edges in the database with distinct update times. edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -3172,7 +3180,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node1.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. @@ -3181,7 +3189,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node2.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions. @@ -3248,12 +3256,12 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { // removed from the update index. edge1.ChannelFlags = 2 edge1.LastUpdate = time.Now() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } edge2.ChannelFlags = 3 edge2.LastUpdate = edge1.LastUpdate.Add(time.Hour) - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3290,7 +3298,7 @@ func TestPruneGraphNodes(t *testing.T) { // We'll start off by inserting our source node, to ensure that it's // the only node left after we prune the graph. sourceNode := createTestVertex(t) - if err := graph.SetSourceNode(sourceNode); err != nil { + if err := graph.SetSourceNode(ctx, sourceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -3313,7 +3321,7 @@ func TestPruneGraphNodes(t *testing.T) { // We'll now add a new edge to the graph, but only actually advertise // the edge of *one* of the nodes. edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -3323,7 +3331,7 @@ func TestPruneGraphNodes(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node1.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3356,13 +3364,13 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { // To start, we'll create two nodes, and only add one of them to the // channel graph. node1 := createTestVertex(t) - require.NoError(t, graph.SetSourceNode(node1)) + require.NoError(t, graph.SetSourceNode(ctx, node1)) node2 := createTestVertex(t) // We'll now create an edge between the two nodes, as a result, node2 // should be inserted into the database as a shell node. edgeInfo, _ := createEdge(100, 0, 0, 0, node1, node2) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) // Ensure that node1 was inserted as a full node, while node2 only has // a shell node present. @@ -3376,7 +3384,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { // Show that attempting to add the channel again will result in an // error. - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) require.ErrorIs(t, err, ErrEdgeAlreadyExist) // Show that updating the shell node to a full node record works. @@ -3447,19 +3455,19 @@ func TestNodeIsPublic(t *testing.T) { // some graphs but not others, etc.). aliceGraph := MakeTestGraph(t) aliceNode := createTestVertex(t) - if err := aliceGraph.SetSourceNode(aliceNode); err != nil { + if err := aliceGraph.SetSourceNode(ctx, aliceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } bobGraph := MakeTestGraph(t) bobNode := createTestVertex(t) - if err := bobGraph.SetSourceNode(bobNode); err != nil { + if err := bobGraph.SetSourceNode(ctx, bobNode); err != nil { t.Fatalf("unable to set source node: %v", err) } carolGraph := MakeTestGraph(t) carolNode := createTestVertex(t) - if err := carolGraph.SetSourceNode(carolNode); err != nil { + if err := carolGraph.SetSourceNode(ctx, carolNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -3477,7 +3485,7 @@ func TestNodeIsPublic(t *testing.T) { require.NoError(t, err) } for _, edge := range edges { - if err := graph.AddChannelEdge(edge); err != nil { + if err := graph.AddChannelEdge(ctx, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } } @@ -3551,7 +3559,7 @@ func TestNodeIsPublic(t *testing.T) { } bobCarolEdge.AuthProof = nil - if err := graph.AddChannelEdge(&bobCarolEdge); err != nil { + if err := graph.AddChannelEdge(ctx, &bobCarolEdge); err != nil { t.Fatalf("unable to add edge: %v", err) } } @@ -3592,7 +3600,7 @@ func TestDisabledChannelIDs(t *testing.T) { t.Fatalf("unable to add node: %v", err) } - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3607,7 +3615,7 @@ func TestDisabledChannelIDs(t *testing.T) { // Add one disabled policy and ensure the channel is still not in the // disabled list. edge1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } disabledChanIds, err = graph.DisabledChannelIDs() @@ -3620,7 +3628,7 @@ func TestDisabledChannelIDs(t *testing.T) { // Add second disabled policy and ensure the channel is now in the // disabled list. edge2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } disabledChanIds, err = graph.DisabledChannelIDs() @@ -3676,7 +3684,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { if err := graph.AddLightningNode(ctx, node2); err != nil { t.Fatalf("unable to add node: %v", err) } - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3746,7 +3754,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { require.NoError(t, err, "error writing db") // And add the second, unmodified edge. - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3768,7 +3776,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { // Now add the original, unmodified edge policy, and make sure the edge // policies then become fully populated. - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3800,6 +3808,7 @@ func assertNumZombies(t *testing.T, graph *ChannelGraph, expZombies uint64) { // TestGraphZombieIndex ensures that we can mark edges correctly as zombie/live. func TestGraphZombieIndex(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll start by creating our test graph along with a test edge. graph := MakeTestGraph(t) @@ -3814,7 +3823,7 @@ func TestGraphZombieIndex(t *testing.T) { } edge, _, _ := createChannelEdge(node1, node2) - require.NoError(t, graph.AddChannelEdge(edge)) + require.NoError(t, graph.AddChannelEdge(ctx, edge)) // Since the edge is known the graph and it isn't a zombie, IsZombieEdge // should not report the channel as a zombie. @@ -3982,11 +3991,12 @@ func TestComputeFee(t *testing.T) { // executes multiple AddChannelEdge requests in a single txn. func TestBatchedAddChannelEdge(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) sourceNode := createTestVertex(t) - require.Nil(t, graph.SetSourceNode(sourceNode)) + require.Nil(t, graph.SetSourceNode(ctx, sourceNode)) // We'd like to test the insertion/deletion of edges, so we create two // vertexes to connect. @@ -4040,7 +4050,7 @@ func TestBatchedAddChannelEdge(t *testing.T) { defer wg.Done() select { - case errChan <- graph.AddChannelEdge(&edge): + case errChan <- graph.AddChannelEdge(ctx, &edge): case <-time.After(2 * time.Second): errChan <- errTimeout } @@ -4074,10 +4084,10 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { // Make sure inserting the policy at this point, before the edge info // is added, will fail. - require.Error(t, ErrEdgeNotFound, graph.UpdateEdgePolicy(edge1)) + require.Error(t, ErrEdgeNotFound, graph.UpdateEdgePolicy(ctx, edge1)) // Add the edge info. - require.NoError(t, graph.AddChannelEdge(edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) errTimeout := errors.New("timeout adding batched channel") @@ -4093,7 +4103,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { defer wg.Done() select { - case errChan <- graph.UpdateEdgePolicy(update): + case errChan <- graph.UpdateEdgePolicy(ctx, update): case <-time.After(2 * time.Second): errChan <- errTimeout } @@ -4188,7 +4198,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { } // Add the channel, but only insert a single edge into the graph. - require.NoError(t, graph.AddChannelEdge(edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) getSingleChannel := func() *DirectedChannel { var ch *DirectedChannel @@ -4218,7 +4228,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { FeeRate: 20, } edge1.InboundFee = fn.Some(inboundFee) - require.NoError(t, graph.UpdateEdgePolicy(edge1)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge1)) edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. directedChan := getSingleChannel() @@ -4233,7 +4243,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { // error when we try to update the edge policy. edge1.LastUpdate = edge1.LastUpdate.Add(time.Second) require.ErrorIs( - t, graph.UpdateEdgePolicy(edge1), ErrParsingExtraTLVBytes, + t, graph.UpdateEdgePolicy(ctx, edge1), ErrParsingExtraTLVBytes, ) // Since persistence of the last update failed, we should still bet diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index c1b3dd57874..0759637fd79 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -111,7 +111,7 @@ type V1Store interface { //nolint:interfacebloat // LookupAlias attempts to return the alias as advertised by the target // node. - LookupAlias(pub *btcec.PublicKey) (string, error) + LookupAlias(ctx context.Context, pub *btcec.PublicKey) (string, error) // DeleteLightningNode starts a new database transaction to remove a // vertex/node from the database according to the node's public key. @@ -189,7 +189,7 @@ type V1Store interface { //nolint:interfacebloat // and the set of features that the channel supports. The chanPoint and // chanID are used to uniquely identify the edge globally within the // database. - AddChannelEdge(edge *models.ChannelEdgeInfo, + AddChannelEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // HasChannelEdge returns true if the database knows of a channel edge @@ -227,7 +227,7 @@ type V1Store interface { //nolint:interfacebloat // graph. This represents the "newest" channel from the PoV of the // chain. This method can be used by peers to quickly determine if // they're graphs are in sync. - HighestChanID() (uint64, error) + HighestChanID(ctx context.Context) (uint64, error) // ChanUpdatesInHorizon returns all the known channel edges which have // at least one edge that has an update timestamp within the specified @@ -332,19 +332,20 @@ type V1Store interface { //nolint:interfacebloat // node's information. The node ordering is determined by the // lexicographical ordering of the identity public keys of the nodes on // either side of the channel. - UpdateEdgePolicy(edge *models.ChannelEdgePolicy, + UpdateEdgePolicy(ctx context.Context, edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) // SourceNode returns the source node of the graph. The source node is // treated as the center node within a star-graph. This method may be // used to kick off a path finding algorithm in order to explore the // reachability of another node based off the source node. - SourceNode() (*models.LightningNode, error) + SourceNode(ctx context.Context) (*models.LightningNode, error) // SetSourceNode sets the source node within the graph database. The // source node is to be used as the center of a star-graph within path // finding algorithms. - SetSourceNode(node *models.LightningNode) error + SetSourceNode(ctx context.Context, + node *models.LightningNode) error // PruneTip returns the block height and hash of the latest block that // has been used to prune channels in the graph. Knowing the "prune tip" diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index d722e4fe011..bb5512750f5 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -874,7 +874,9 @@ func (c *KVStore) ForEachNodeCacheable(cb func(route.Vertex, // as the center node within a star-graph. This method may be used to kick off // a path finding algorithm in order to explore the reachability of another // node based off the source node. -func (c *KVStore) SourceNode() (*models.LightningNode, error) { +func (c *KVStore) SourceNode(_ context.Context) (*models.LightningNode, + error) { + var source *models.LightningNode err := kvdb.View(c.db, func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from @@ -926,7 +928,9 @@ func (c *KVStore) sourceNode(nodes kvdb.RBucket) (*models.LightningNode, // SetSourceNode sets the source node within the graph database. The source // node is to be used as the center of a star-graph within path finding // algorithms. -func (c *KVStore) SetSourceNode(node *models.LightningNode) error { +func (c *KVStore) SetSourceNode(_ context.Context, + node *models.LightningNode) error { + nodePubBytes := node.PubKeyBytes[:] return kvdb.Update(c.db, func(tx kvdb.RwTx) error { @@ -993,7 +997,9 @@ func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error { // LookupAlias attempts to return the alias as advertised by the target node. // TODO(roasbeef): currently assumes that aliases are unique... -func (c *KVStore) LookupAlias(pub *btcec.PublicKey) (string, error) { +func (c *KVStore) LookupAlias(_ context.Context, + pub *btcec.PublicKey) (string, error) { + var alias string err := kvdb.View(c.db, func(tx kvdb.RTx) error { @@ -1094,10 +1100,8 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, - opts ...batch.SchedulerOption) error { - - ctx := context.TODO() +func (c *KVStore) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error { var alreadyExists bool r := &batch.Request[kvdb.RwTx]{ @@ -1944,7 +1948,7 @@ func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) { // HighestChanID returns the "highest" known channel ID in the channel graph. // This represents the "newest" channel from the PoV of the chain. This method // can be used by peers to quickly determine if they're graphs are in sync. -func (c *KVStore) HighestChanID() (uint64, error) { +func (c *KVStore) HighestChanID(_ context.Context) (uint64, error) { var cid uint64 err := kvdb.View(c.db, func(tx kvdb.RTx) error { @@ -2799,11 +2803,11 @@ func makeZombiePubkeys(info *models.ChannelEdgeInfo, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, +func (c *KVStore) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { var ( - ctx = context.TODO() isUpdate1 bool edgeNotFound bool from, to route.Vertex diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 22512619c48..564ce96c636 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -355,11 +355,10 @@ func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) ( // LookupAlias attempts to return the alias as advertised by the target node. // // NOTE: part of the V1Store interface. -func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) { - var ( - ctx = context.TODO() - alias string - ) +func (s *SQLStore) LookupAlias(ctx context.Context, + pub *btcec.PublicKey) (string, error) { + + var alias string err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { dbNode, err := db.GetNodeByPubKey( ctx, sqlc.GetNodeByPubKeyParams{ @@ -394,8 +393,8 @@ func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) { // node based off the source node. // // NOTE: part of the V1Store interface. -func (s *SQLStore) SourceNode() (*models.LightningNode, error) { - ctx := context.TODO() +func (s *SQLStore) SourceNode(ctx context.Context) (*models.LightningNode, + error) { var node *models.LightningNode err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { @@ -421,8 +420,8 @@ func (s *SQLStore) SourceNode() (*models.LightningNode, error) { // algorithms. // // NOTE: part of the V1Store interface. -func (s *SQLStore) SetSourceNode(node *models.LightningNode) error { - ctx := context.TODO() +func (s *SQLStore) SetSourceNode(ctx context.Context, + node *models.LightningNode) error { return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { id, err := upsertNode(ctx, db, node) @@ -501,10 +500,8 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime, // globally within the database. // // NOTE: part of the V1Store interface. -func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo, - opts ...batch.SchedulerOption) error { - - ctx := context.TODO() +func (s *SQLStore) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error { var alreadyExists bool r := &batch.Request[SQLQueries]{ @@ -546,9 +543,7 @@ func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo, // can be used by peers to quickly determine if their graphs are in sync. // // NOTE: This is part of the V1Store interface. -func (s *SQLStore) HighestChanID() (uint64, error) { - ctx := context.TODO() - +func (s *SQLStore) HighestChanID(ctx context.Context) (uint64, error) { var highestChanID uint64 err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { chanID, err := db.HighestSCID(ctx, int16(ProtocolV1)) @@ -579,11 +574,10 @@ func (s *SQLStore) HighestChanID() (uint64, error) { // nodes on either side of the channel. // // NOTE: part of the V1Store interface. -func (s *SQLStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, +func (s *SQLStore) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { - ctx := context.TODO() - var ( isUpdate1 bool edgeNotFound bool diff --git a/graph/interfaces.go b/graph/interfaces.go index e9f89404150..be226a495dc 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -29,7 +29,7 @@ type ChannelGraphSource interface { // AddEdge is used to add edge/channel to the topology of the router, // after all information about channel will be gathered this // edge/channel might be used in construction of payment path. - AddEdge(edge *models.ChannelEdgeInfo, + AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // AddProof updates the channel edge info with proof which is needed to @@ -39,7 +39,7 @@ type ChannelGraphSource interface { // UpdateEdge is used to update edge information, without this message // edge considered as not fully constructed. - UpdateEdge(policy *models.ChannelEdgePolicy, + UpdateEdge(ctx context.Context, policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error // IsStaleNode returns true if the graph source has a node announcement @@ -134,7 +134,7 @@ type DB interface { // treated as the center node within a star-graph. This method may be // used to kick off a path finding algorithm in order to explore the // reachability of another node based off the source node. - SourceNode() (*models.LightningNode, error) + SourceNode(ctx context.Context) (*models.LightningNode, error) // DisabledChannelIDs returns the channel ids of disabled channels. // A channel is disabled when two of the associated ChanelEdgePolicies @@ -215,7 +215,7 @@ type DB interface { // and the set of features that the channel supports. The chanPoint and // chanID are used to uniquely identify the edge globally within the // database. - AddChannelEdge(edge *models.ChannelEdgeInfo, + AddChannelEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // MarkEdgeZombie attempts to mark a channel identified by its channel @@ -231,7 +231,7 @@ type DB interface { // node's information. The node ordering is determined by the // lexicographical ordering of the identity public keys of the nodes on // either side of the channel. - UpdateEdgePolicy(edge *models.ChannelEdgePolicy, + UpdateEdgePolicy(ctx context.Context, edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error // HasLightningNode determines if the graph has a vertex identified by diff --git a/graph/notifications_test.go b/graph/notifications_test.go index c9096efc183..7b4fa69cb5f 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -422,6 +422,7 @@ func (m *mockChainView) FilterBlock(blockHash *chainhash.Hash) (*chainview.Filte // a proper notification is sent of to all registered clients. func TestEdgeUpdateNotification(t *testing.T) { t.Parallel() + ctxb := context.Background() ctx := createTestCtxSingleNode(t, 0) @@ -464,7 +465,7 @@ func TestEdgeUpdateNotification(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -483,10 +484,10 @@ func TestEdgeUpdateNotification(t *testing.T) { require.NoError(t, err, "unable to create a random chan policy") edge2.ChannelFlags = 1 - if err := ctx.builder.UpdateEdge(edge1); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge update: %v", err) } - if err := ctx.builder.UpdateEdge(edge2); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge update: %v", err) } @@ -657,7 +658,7 @@ func TestNodeUpdateNotification(t *testing.T) { // Adding the edge will add the nodes to the graph, but with no info // except the pubkey known. - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -844,7 +845,7 @@ func TestNotificationCancellation(t *testing.T) { } copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -875,6 +876,7 @@ func TestNotificationCancellation(t *testing.T) { // properly dispatched to all registered clients. func TestChannelCloseNotification(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -918,7 +920,7 @@ func TestChannelCloseNotification(t *testing.T) { } copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1061,7 +1063,8 @@ func createTestCtxSingleNode(t *testing.T, sourceNode := createTestNode(t) require.NoError(t, - graph.SetSourceNode(sourceNode), "failed to set source node", + graph.SetSourceNode(context.Background(), sourceNode), + "failed to set source node", ) graphInstance := &testGraphInstance{ @@ -1076,7 +1079,7 @@ func createTestCtxSingleNode(t *testing.T, func (c *testCtx) RestartBuilder(t *testing.T) { c.chainView.Reset() - selfNode, err := c.graph.SourceNode() + selfNode, err := c.graph.SourceNode(context.Background()) require.NoError(t, err) // With the chainView reset, we'll now re-create the builder itself, and @@ -1149,7 +1152,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, ConfChan: make(chan *chainntnfs.TxConfirmation), } - selfnode, err := graphInstance.graph.SourceNode() + selfnode, err := graphInstance.graph.SourceNode(context.Background()) require.NoError(t, err) graphBuilder, err := NewBuilder(&Config{ diff --git a/lnd.go b/lnd.go index 3afa8c2fba4..76b08a114a1 100644 --- a/lnd.go +++ b/lnd.go @@ -663,9 +663,9 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg, // Now we have created all dependencies necessary to populate and // start the RPC server. err = rpcServer.addDeps( - server, interceptorChain.MacaroonService(), cfg.SubRPCServers, - atplManager, server.invoices, tower, multiAcceptor, - server.invoiceHtlcModifier, + ctx, server, interceptorChain.MacaroonService(), + cfg.SubRPCServers, atplManager, server.invoices, tower, + multiAcceptor, server.invoiceHtlcModifier, ) if err != nil { return mkErr("unable to add deps to RPC server", err) diff --git a/lnrpc/devrpc/dev_server.go b/lnrpc/devrpc/dev_server.go index a28a2500a1e..d2511886492 100644 --- a/lnrpc/devrpc/dev_server.go +++ b/lnrpc/devrpc/dev_server.go @@ -293,7 +293,7 @@ func (s *Server) ImportGraph(ctx context.Context, } edge.ChannelPoint = *channelPoint - if err := graphDB.AddChannelEdge(edge); err != nil { + if err := graphDB.AddChannelEdge(ctx, edge); err != nil { return nil, fmt.Errorf("unable to add edge %v: %w", rpcEdge.ChanPoint, err) } @@ -331,7 +331,8 @@ func (s *Server) ImportGraph(ctx context.Context, if rpcEdge.Node1Policy != nil { policy := makePolicy(rpcEdge.Node1Policy) policy.ChannelFlags = 0 - if err := graphDB.UpdateEdgePolicy(policy); err != nil { + err := graphDB.UpdateEdgePolicy(ctx, policy) + if err != nil { return nil, fmt.Errorf( "unable to update policy: %v", err) } @@ -340,7 +341,8 @@ func (s *Server) ImportGraph(ctx context.Context, if rpcEdge.Node2Policy != nil { policy := makePolicy(rpcEdge.Node2Policy) policy.ChannelFlags = 1 - if err := graphDB.UpdateEdgePolicy(policy); err != nil { + err := graphDB.UpdateEdgePolicy(ctx, policy) + if err != nil { return nil, fmt.Errorf( "unable to update policy: %v", err) } diff --git a/lnrpc/peersrpc/config_active.go b/lnrpc/peersrpc/config_active.go index 4a2f028e454..6b9ef6d2607 100644 --- a/lnrpc/peersrpc/config_active.go +++ b/lnrpc/peersrpc/config_active.go @@ -4,6 +4,7 @@ package peersrpc import ( + "context" "net" "github.com/lightningnetwork/lnd/lnwire" @@ -27,6 +28,7 @@ type Config struct { // setting the feature vector provided and applying the // NodeAnnModifiers. If no feature updates are required, a nil feature // vector should be provided. - UpdateNodeAnnouncement func(features *lnwire.RawFeatureVector, + UpdateNodeAnnouncement func(ctx context.Context, + features *lnwire.RawFeatureVector, mods ...netann.NodeAnnModifier) error } diff --git a/lnrpc/peersrpc/peers_server.go b/lnrpc/peersrpc/peers_server.go index 27dfa347036..7bc68ca2b4f 100644 --- a/lnrpc/peersrpc/peers_server.go +++ b/lnrpc/peersrpc/peers_server.go @@ -306,7 +306,7 @@ func (s *Server) updateFeatures(currentfeatures *lnwire.RawFeatureVector, // UpdateNodeAnnouncement allows the caller to update the node parameters // and broadcasts a new version of the node announcement to its peers. -func (s *Server) UpdateNodeAnnouncement(_ context.Context, +func (s *Server) UpdateNodeAnnouncement(ctx context.Context, req *NodeAnnouncementUpdateRequest) ( *NodeAnnouncementUpdateResponse, error) { @@ -393,7 +393,7 @@ func (s *Server) UpdateNodeAnnouncement(_ context.Context, } if err := s.cfg.UpdateNodeAnnouncement( - nodeAnnFeatures, nodeModifiers..., + ctx, nodeAnnFeatures, nodeModifiers..., ); err != nil { return nil, err } diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index 492f8f18c28..a75758e88f3 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -2,6 +2,7 @@ package localchans import ( "bytes" + "context" "errors" "fmt" "sync" @@ -48,7 +49,7 @@ type Manager struct { error) // AddEdge is used to add edge/channel to the topology of the router. - AddEdge func(edge *models.ChannelEdgeInfo) error + AddEdge func(ctx context.Context, edge *models.ChannelEdgeInfo) error // policyUpdateLock ensures that the database and the link do not fall // out of sync if there are concurrent fee update calls. Without it, @@ -60,7 +61,8 @@ type Manager struct { // UpdatePolicy updates the policy for the specified channels on disk and in // the active links. -func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, +func (r *Manager) UpdatePolicy(ctx context.Context, + newSchema routing.ChannelPolicy, createMissingEdge bool, chanPoints ...wire.OutPoint) ( []*lnrpc.FailedUpdate, error) { @@ -192,7 +194,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, channel.FundingOutpoint.String()) info, edge, failedUpdate := r.createMissingEdge( - channel, newSchema, + ctx, channel, newSchema, ) if failedUpdate == nil { err = processChan(info, edge) @@ -234,7 +236,8 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, return failedUpdates, nil } -func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel, +func (r *Manager) createMissingEdge(ctx context.Context, + channel *channeldb.OpenChannel, newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) { @@ -264,7 +267,7 @@ func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel, // Insert the edge into the database to avoid `edge not // found` errors during policy update propagation. - err = r.AddEdge(info) + err = r.AddEdge(ctx, info) if err != nil { log.Errorf("Attempt to add missing edge for "+ "channel (%s) errored with: %v", diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go index d428a846bea..40da6fd0407 100644 --- a/routing/localchans/manager_test.go +++ b/routing/localchans/manager_test.go @@ -1,6 +1,7 @@ package localchans import ( + "context" "encoding/hex" "testing" "time" @@ -163,7 +164,7 @@ func TestManager(t *testing.T) { }, nil } - addEdge := func(edge *models.ChannelEdgeInfo) error { + addEdge := func(_ context.Context, _ *models.ChannelEdgeInfo) error { return nil } @@ -314,7 +315,9 @@ func TestManager(t *testing.T) { channelSet = test.channelSet expectedNumUpdates = test.expectedNumUpdates - failedUpdates, err := manager.UpdatePolicy(test.newPolicy, + failedUpdates, err := manager.UpdatePolicy( + context.Background(), + test.newPolicy, test.createMissingEdge, test.specifiedChanPoints...) diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 0c483f0d9c9..e1962f5d9d5 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -300,7 +300,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( if source != nil { // Set the selected source node - if err := graph.SetSourceNode(source); err != nil { + if err := graph.SetSourceNode(ctx, source); err != nil { return nil, err } } @@ -356,7 +356,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } @@ -381,7 +381,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( FeeProportionalMillionths: lnwire.MilliSatoshi(edge.FeeRate), ToNode: targetNode, } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, err } @@ -590,7 +590,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, ) require.NoError(t, err) - if err = graph.SetSourceNode(dbNode); err != nil { + if err = graph.SetSourceNode(ctx, dbNode); err != nil { return nil, err } @@ -666,7 +666,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, BitcoinKey2Bytes: node2Vertex, } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } @@ -719,7 +719,8 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, InboundFee: getInboundFees(node1), //nolint:ll ExtraOpaqueData: getExtraData(node1), } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + err := graph.UpdateEdgePolicy(ctx, edgePolicy) + if err != nil { return nil, err } } @@ -750,7 +751,8 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, InboundFee: getInboundFees(node2), //nolint:ll ExtraOpaqueData: getExtraData(node2), } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + err := graph.UpdateEdgePolicy(ctx, edgePolicy) + if err != nil { return nil, err } } @@ -1067,11 +1069,12 @@ func runBasicGraphPathFinding(t *testing.T, useCache bool) { func testBasicGraphPathFindingCase(t *testing.T, graphInstance *testGraphInstance, test *basicGraphPathFindingTestCase) { + ctx := context.Background() aliases := graphInstance.aliasMap expectedHops := test.expectedHops expectedHopCount := len(expectedHops) - sourceNode, err := graphInstance.graph.SourceNode() + sourceNode, err := graphInstance.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") sourceVertex := route.Vertex(sourceNode.PubKeyBytes) @@ -1211,7 +1214,9 @@ func runPathFindingWithAdditionalEdges(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -1294,7 +1299,9 @@ func runPathFindingWithBlindedPathDuplicateHop(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -1779,7 +1786,9 @@ func runPathNotAvailable(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") // With the test graph loaded, we'll test that queries for target that @@ -1835,7 +1844,7 @@ func runDestTLVGraphFallback(t *testing.T, useCache bool) { ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") - sourceNode, err := ctx.graph.SourceNode() + sourceNode, err := ctx.graph.SourceNode(context.Background()) require.NoError(t, err, "unable to fetch source node") find := func(r *RestrictParams, @@ -2053,7 +2062,8 @@ func runPathInsufficientCapacity(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") // Next, test that attempting to find a path in which the current @@ -2083,7 +2093,8 @@ func runRouteFailMinHTLC(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") // We'll not attempt to route an HTLC of 10 SAT from roasbeef to Son @@ -2146,9 +2157,8 @@ func runRouteFailMaxHTLC(t *testing.T, useCache bool) { require.NoError(t, err, "unable to fetch channel edges by ID") midEdge.MessageFlags = 1 midEdge.MaxHTLC = payAmt - 1 - if err := graph.UpdateEdgePolicy(midEdge); err != nil { - t.Fatalf("unable to update edge: %v", err) - } + err = graph.UpdateEdgePolicy(context.Background(), midEdge) + require.NoError(t, err) // We'll now attempt to route through that edge with a payment above // 100k msat, which should fail. @@ -2167,7 +2177,8 @@ func runRouteFailDisabledEdge(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") // First, we'll try to route from roasbeef -> sophon. This should @@ -2188,11 +2199,11 @@ func runRouteFailDisabledEdge(t *testing.T, useCache bool) { _, e1, e2, err := graph.graph.FetchChannelEdgesByID(roasToPham) require.NoError(t, err, "unable to fetch edge") e1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e1); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e1); err != nil { t.Fatalf("unable to update edge: %v", err) } e2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e2); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2209,7 +2220,7 @@ func runRouteFailDisabledEdge(t *testing.T, useCache bool) { _, e, _, err := graph.graph.FetchChannelEdgesByID(phamToSophon) require.NoError(t, err, "unable to fetch edge") e.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2232,7 +2243,8 @@ func runPathSourceEdgesBandwidth(t *testing.T, useCache bool) { graph, err := parseTestGraph(t, useCache, basicGraphFilePath) require.NoError(t, err, "unable to create graph") - sourceNode, err := graph.graph.SourceNode() + ctx := context.Background() + sourceNode, err := graph.graph.SourceNode(ctx) require.NoError(t, err, "unable to fetch source node") // First, we'll try to route from roasbeef -> sophon. This should @@ -2290,11 +2302,11 @@ func runPathSourceEdgesBandwidth(t *testing.T, useCache bool) { _, e1, e2, err := graph.graph.FetchChannelEdgesByID(roasToSongoku) require.NoError(t, err, "unable to fetch edge") e1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e1); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e1); err != nil { t.Fatalf("unable to update edge: %v", err) } e2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e2); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3162,7 +3174,9 @@ func newPathFindingTestContext(t *testing.T, useCache bool, ) require.NoError(t, err, "unable to create graph") - sourceNode, err := testGraphInstance.graph.SourceNode() + sourceNode, err := testGraphInstance.graph.SourceNode( + context.Background(), + ) require.NoError(t, err, "unable to fetch source node") ctx := &pathFindingTestContext{ @@ -3233,7 +3247,8 @@ func dbFindPath(graph *graphdb.ChannelGraph, source, target route.Vertex, amt lnwire.MilliSatoshi, timePref float64, finalHtlcExpiry int32) ([]*unifiedEdge, error) { - sourceNode, err := graph.SourceNode() + ctx := context.Background() + sourceNode, err := graph.SourceNode(ctx) if err != nil { return nil, err } @@ -3264,7 +3279,7 @@ func dbFindPath(graph *graphdb.ChannelGraph, func dbFindBlindedPaths(graph *graphdb.ChannelGraph, restrictions *blindedPathRestrictions) ([][]blindedHop, error) { - sourceNode, err := graph.SourceNode() + sourceNode, err := graph.SourceNode(context.Background()) if err != nil { return nil, err } diff --git a/routing/router_test.go b/routing/router_test.go index 04f42c83d7a..3394e2427db 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -133,7 +133,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, ) require.NoError(t, err) - sourceNode, err := graphInstance.graph.SourceNode() + sourceNode, err := graphInstance.graph.SourceNode(context.Background()) require.NoError(t, err) sessionSource := &SessionSource{ GraphSessionFactory: graphInstance.graph, @@ -1203,7 +1203,7 @@ func TestFindPathFeeWeighting(t *testing.T) { var preImage [32]byte copy(preImage[:], bytes.Repeat([]byte{9}, 32)) - sourceNode, err := ctx.graph.SourceNode() + sourceNode, err := ctx.graph.SourceNode(context.Background()) require.NoError(t, err, "unable to fetch source node") amt := lnwire.MilliSatoshi(100) @@ -2744,7 +2744,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { BitcoinKey2Bytes: pub2, AuthProof: nil, } - require.NoError(t, ctx.graph.AddChannelEdge(edge)) + require.NoError(t, ctx.graph.AddChannelEdge(ctxb, edge)) // We must add the edge policy to be able to use the edge for route // finding. @@ -2760,7 +2760,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 0 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // Create edge in the other direction as well. edgePolicy = &models.ChannelEdgePolicy{ @@ -2775,7 +2775,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 1 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // After adding the edge between the two previously unknown nodes, they // should have been added to the graph. @@ -2824,7 +2824,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], node1Bytes) edge.BitcoinKey2Bytes = node2Bytes - require.NoError(t, ctx.graph.AddChannelEdge(edge)) + require.NoError(t, ctx.graph.AddChannelEdge(ctxb, edge)) edgePolicy = &models.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), @@ -2838,7 +2838,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 0 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) edgePolicy = &models.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), @@ -2852,7 +2852,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 1 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // We should now be able to find a route to node 2. paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -2943,7 +2943,9 @@ type mockGraphBuilder struct { func newMockGraphBuilder(graph graph.DB) *mockGraphBuilder { return &mockGraphBuilder{ updateEdge: func(update *models.ChannelEdgePolicy) error { - return graph.UpdateEdgePolicy(update) + return graph.UpdateEdgePolicy( + context.Background(), update, + ) }, } } diff --git a/rpcserver.go b/rpcserver.go index 3c72d3459f8..7140731a6f9 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -681,14 +681,15 @@ func newRPCServer(cfg *Config, interceptorChain *rpcperms.InterceptorChain, // addDeps populates all dependencies needed by the RPC server, and any // of the sub-servers that it maintains. When this is done, the RPC server can // be started, and start accepting RPC calls. -func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, +func (r *rpcServer) addDeps(ctx context.Context, s *server, + macService *macaroons.Service, subServerCgs *subRPCServerConfigs, atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone, chanPredicate chanacceptor.MultiplexAcceptor, invoiceHtlcModifier *invoices.HtlcModificationInterceptor) error { // Set up router rpc backend. - selfNode, err := s.graphDB.SourceNode() + selfNode, err := s.graphDB.SourceNode(ctx) if err != nil { return err } @@ -4644,7 +4645,7 @@ func (r *rpcServer) ListChannels(ctx context.Context, // our list depending on the type of channels requested to us. isActive := peerOnline && linkActive channel, err := createRPCOpenChannel( - r, dbChannel, isActive, in.PeerAliasLookup, + ctx, r, dbChannel, isActive, in.PeerAliasLookup, ) if err != nil { return nil, err @@ -4760,7 +4761,10 @@ func encodeCustomChanData(lnChan *channeldb.OpenChannel) ([]byte, error) { } // createRPCOpenChannel creates an *lnrpc.Channel from the *channeldb.Channel. -func createRPCOpenChannel(r *rpcServer, dbChannel *channeldb.OpenChannel, +// +//nolint:funlen +func createRPCOpenChannel(ctx context.Context, r *rpcServer, + dbChannel *channeldb.OpenChannel, isActive, peerAliasLookup bool) (*lnrpc.Channel, error) { nodePub := dbChannel.IdentityPub @@ -4862,7 +4866,7 @@ func createRPCOpenChannel(r *rpcServer, dbChannel *channeldb.OpenChannel, // Look up our channel peer's node alias if the caller requests it. if peerAliasLookup { - peerAlias, err := r.server.graphDB.LookupAlias(nodePub) + peerAlias, err := r.server.graphDB.LookupAlias(ctx, nodePub) if err != nil { peerAlias = fmt.Sprintf("unable to lookup "+ "peer alias: %v", err) @@ -5306,7 +5310,8 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, } case channelnotifier.OpenChannelEvent: channel, err := createRPCOpenChannel( - r, event.Channel, true, false, + updateStream.Context(), r, + event.Channel, true, false, ) if err != nil { return err @@ -7653,7 +7658,7 @@ func (r *rpcServer) FeeReport(ctx context.Context, _ *lnrpc.FeeReportRequest) (*lnrpc.FeeReportResponse, error) { channelGraph := r.server.graphDB - selfNode, err := channelGraph.SourceNode() + selfNode, err := channelGraph.SourceNode(ctx) if err != nil { return nil, err } @@ -7931,8 +7936,9 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context, // With the scope resolved, we'll now send this to the local channel // manager so it can propagate the new policy for our target channel(s). - failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy, - req.CreateMissingEdge, targetChans...) + failedUpdates, err := r.server.localChanMgr.UpdatePolicy( + ctx, chanPolicy, req.CreateMissingEdge, targetChans..., + ) if err != nil { return nil, err } diff --git a/server.go b/server.go index 2c985b2cc66..677e978ae77 100644 --- a/server.go +++ b/server.go @@ -970,7 +970,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, // Finally, we'll update the representation on disk, and update our // cached in-memory version as well. - if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil { + if err := dbs.GraphDB.SetSourceNode(ctx, selfNode); err != nil { return nil, fmt.Errorf("can't set self node: %w", err) } s.currentNodeAnn = nodeAnn @@ -1070,7 +1070,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, MinProbability: routingConfig.MinRouteProbability, } - sourceNode, err := dbs.GraphDB.SourceNode() + sourceNode, err := dbs.GraphDB.SourceNode(ctx) if err != nil { return nil, fmt.Errorf("error getting source node: %w", err) } @@ -1224,8 +1224,10 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, FetchChannel: s.chanStateDB.FetchChannel, - AddEdge: func(edge *models.ChannelEdgeInfo) error { - return s.graphBuilder.AddEdge(edge) + AddEdge: func(ctx context.Context, + edge *models.ChannelEdgeInfo) error { + + return s.graphBuilder.AddEdge(ctx, edge) }, } @@ -2073,7 +2075,11 @@ func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl, func() error { return healthcheck.CheckTorServiceStatus( s.torController, - s.createNewHiddenService, + func() error { + return s.createNewHiddenService( + context.TODO(), + ) + }, ) }, cfg.HealthChecks.TorConnection.Interval, @@ -2467,7 +2473,7 @@ func (s *server) Start(ctx context.Context) error { if s.torController != nil { cleanup = cleanup.add(s.torController.Stop) - if err := s.createNewHiddenService(); err != nil { + if err := s.createNewHiddenService(ctx); err != nil { startErr = err return } @@ -3325,7 +3331,7 @@ func (s *server) initialPeerBootstrap(ctx context.Context, // createNewHiddenService automatically sets up a v2 or v3 onion service in // order to listen for inbound connections over Tor. -func (s *server) createNewHiddenService() error { +func (s *server) createNewHiddenService(ctx context.Context) error { // Determine the different ports the server is listening on. The onion // service's virtual port will map to these ports and one will be picked // at random when the onion service is being accessed. @@ -3390,7 +3396,7 @@ func (s *server) createNewHiddenService() error { AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(), } copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed()) - if err := s.graphDB.SetSourceNode(selfNode); err != nil { + if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil { return fmt.Errorf("can't set self node: %w", err) } @@ -3485,7 +3491,8 @@ func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector, // applying the giving modifiers and updating the time stamp // to ensure it propagates through the network. Then it broadcasts // it to the network. -func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector, +func (s *server) updateAndBroadcastSelfNode(ctx context.Context, + features *lnwire.RawFeatureVector, modifiers ...netann.NodeAnnModifier) error { newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...) @@ -3497,7 +3504,7 @@ func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector, // Update the on-disk version of our announcement. // Load and modify self node istead of creating anew instance so we // don't risk overwriting any existing values. - selfNode, err := s.graphDB.SourceNode() + selfNode, err := s.graphDB.SourceNode(ctx) if err != nil { return fmt.Errorf("unable to get current source node: %w", err) } @@ -3512,7 +3519,7 @@ func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector, copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed()) - if err := s.graphDB.SetSourceNode(selfNode); err != nil { + if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil { return fmt.Errorf("can't set self node: %w", err) } diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 6a7ba2a4d61..5034fcc405f 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -1,6 +1,7 @@ package lnd import ( + "context" "fmt" "net" "reflect" @@ -122,7 +123,8 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, genInvoiceFeatures func() *lnwire.FeatureVector, genAmpInvoiceFeatures func() *lnwire.FeatureVector, getNodeAnnouncement func() lnwire.NodeAnnouncement, - updateNodeAnnouncement func(features *lnwire.RawFeatureVector, + updateNodeAnnouncement func(ctx context.Context, + features *lnwire.RawFeatureVector, modifiers ...netann.NodeAnnModifier) error, parseAddr func(addr string) (net.Addr, error), rpcLogger btclog.Logger, aliasMgr *aliasmgr.Manager,