Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions autopilot/prefattach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
Capacity: capacity,
}
edge.AddNodeKeys(lnNode1, lnNode2, lnNode1, lnNode2)
if err := d.db.AddChannelEdge(edge); err != nil {
if err := d.db.AddChannelEdge(ctx, edge); err != nil {
return nil, nil, err
}
edgePolicy := &models.ChannelEdgePolicy{
Expand All @@ -504,7 +504,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
ChannelFlags: 0,
}

if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil {
return nil, nil, err
}
edgePolicy = &models.ChannelEdgePolicy{
Expand All @@ -519,7 +519,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
MessageFlags: 1,
ChannelFlags: 1,
}
if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil {
return nil, nil, err
}

Expand Down
10 changes: 7 additions & 3 deletions discovery/chan_series.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package discovery

import (
"context"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand All @@ -22,7 +23,8 @@ type ChannelGraphTimeSeries interface {
// height that's close to the current tip of the main chain as we
// know it. We'll use this to start our QueryChannelRange dance with
// the remote node.
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)
HighestChanID(ctx context.Context,
chain chainhash.Hash) (*lnwire.ShortChannelID, error)

// UpdatesInHorizon returns all known channel and node updates with an
// update timestamp between the start time and end time. We'll use this
Expand Down Expand Up @@ -87,8 +89,10 @@ func NewChanSeries(graph *graphdb.ChannelGraph) *ChanSeries {
// this to start our QueryChannelRange dance with the remote node.
//
// NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
chanID, err := c.graph.HighestChanID()
func (c *ChanSeries) HighestChanID(ctx context.Context,
_ chainhash.Hash) (*lnwire.ShortChannelID, error) {

chanID, err := c.graph.HighestChanID(ctx)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2288,7 +2288,7 @@ func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,

// updateChannel creates a new fully signed update for the channel, and updates
// the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(_ context.Context,
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
info *models.ChannelEdgeInfo,
edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
*lnwire.ChannelUpdate1, error) {
Expand Down Expand Up @@ -2322,7 +2322,7 @@ func (d *AuthenticatedGossiper) updateChannel(_ context.Context,
}

// Finally, we'll write the new edge policy to disk.
if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
return nil, nil, err
}

Expand Down Expand Up @@ -2808,7 +2808,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
// We will add the edge to the channel router. If the nodes present in
// this channel are not present in the database, a partial node will be
// added to represent each node while we wait for a node announcement.
err = d.cfg.Graph.AddEdge(edge, ops...)
err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
if err != nil {
log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
scid.ToUint64(), err)
Expand Down Expand Up @@ -3263,7 +3263,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
ExtraOpaqueData: upd.ExtraOpaqueData,
}

if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
if graph.IsError(
err, graph.ErrOutdated,
graph.ErrIgnored,
Expand Down
8 changes: 4 additions & 4 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (r *mockGraphSource) IsZombieEdge(chanID lnwire.ShortChannelID) (bool,
return ok, nil
}

func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo,
_ ...batch.SchedulerOption) error {
func (r *mockGraphSource) AddEdge(_ context.Context,
info *models.ChannelEdgeInfo, _ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -161,8 +161,8 @@ func (r *mockGraphSource) queueValidationFail(chanID uint64) {
r.chansToReject[chanID] = struct{}{}
}

func (r *mockGraphSource) UpdateEdge(edge *models.ChannelEdgePolicy,
_ ...batch.SchedulerOption) error {
func (r *mockGraphSource) UpdateEdge(_ context.Context,
edge *models.ChannelEdgePolicy, _ ...batch.SchedulerOption) error {

r.mu.Lock()
defer func() {
Expand Down
6 changes: 4 additions & 2 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,14 @@ func (g *GossipSyncer) processChanRangeReply(_ context.Context,
// party when we're kicking off the channel graph synchronization upon
// connection. The historicalQuery boolean can be used to generate a query from
// the genesis block of the chain.
func (g *GossipSyncer) genChanRangeQuery(_ context.Context,
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
historicalQuery bool) (*lnwire.QueryChannelRange, error) {

// First, we'll query our channel graph time series for its highest
// known channel ID.
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
newestChan, err := g.cfg.channelSeries.HighestChanID(
ctx, g.cfg.chainHash,
)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion discovery/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ func newMockChannelGraphTimeSeries(
}
}

func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
func (m *mockChannelGraphTimeSeries) HighestChanID(_ context.Context,
_ chainhash.Hash) (*lnwire.ShortChannelID, error) {

return &m.highestID, nil
}

func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {

Expand Down
24 changes: 13 additions & 11 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,8 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
// database. It returns a bool indicating whether the updates were successful.
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
ctx := context.TODO()

ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
if err != nil {
log.Errorf("Unable to retrieve channel by id: %v", err)
Expand Down Expand Up @@ -959,7 +961,7 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
ExtraOpaqueData: msg.ExtraOpaqueData,
}

err = b.UpdateEdge(update)
err = b.UpdateEdge(ctx, update)
if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
log.Errorf("Unable to apply channel update: %v", err)
return false
Expand Down Expand Up @@ -1017,10 +1019,10 @@ func (b *Builder) addNode(ctx context.Context, node *models.LightningNode,
// in construction of payment path.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

err := b.addEdge(edge, op...)
err := b.addEdge(ctx, edge, op...)
if err != nil {
logNetworkMsgProcessError(err)

Expand All @@ -1038,7 +1040,7 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
//
// TODO(elle): this currently also does funding-transaction validation. But this
// should be moved to the gossiper instead.
func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID)
Expand All @@ -1061,7 +1063,7 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
edge.ChannelID)
}

if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil {
if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil {
return fmt.Errorf("unable to add edge: %w", err)
}

Expand Down Expand Up @@ -1118,10 +1120,10 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
// considered as not fully constructed.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {
func (b *Builder) UpdateEdge(ctx context.Context,
update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {

err := b.updateEdge(update, op...)
err := b.updateEdge(ctx, update, op...)
if err != nil {
logNetworkMsgProcessError(err)

Expand All @@ -1135,8 +1137,8 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
// persisted in the graph, and then applies it to the graph if the update is
// considered fresh enough and if we actually have a channel persisted for the
// given update.
func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {
func (b *Builder) updateEdge(ctx context.Context,
policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {

log.Debugf("Received ChannelEdgePolicy for channel %v",
policy.ChannelID)
Expand Down Expand Up @@ -1209,7 +1211,7 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,

// Now that we know this isn't a stale update, we'll apply the new edge
// policy to the proper directional edge within the channel graph.
if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil {
if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
return err
Expand Down
Loading