Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 81 additions & 120 deletions discovery/chan_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"iter"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -30,8 +31,8 @@ type ChannelGraphTimeSeries interface {
// update timestamp between the start time and end time. We'll use this
// to catch up a remote node to the set of channel updates that they
// may have missed out on within the target chain.
UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)
UpdatesInHorizon(chain chainhash.Hash, startTime time.Time,
endTime time.Time) iter.Seq2[lnwire.Message, error]

// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
// and returns a filtered set of chan ID's. This filtered set of chan
Expand Down Expand Up @@ -108,140 +109,100 @@ func (c *ChanSeries) HighestChanID(ctx context.Context,
//
// NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
startTime, endTime time.Time) iter.Seq2[lnwire.Message, error] {

var updates []lnwire.Message

// First, we'll query for all the set of channels that have an update
// that falls within the specified horizon.
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
startTime, endTime,
)
if err != nil {
return nil, err
}

// nodesFromChan records the nodes seen from the channels.
nodesFromChan := make(map[[33]byte]struct{}, len(chansInHorizon)*2)

for _, channel := range chansInHorizon {
// If the channel hasn't been fully advertised yet, or is a
// private channel, then we'll skip it as we can't construct a
// full authentication proof if one is requested.
if channel.Info.AuthProof == nil {
continue
}

chanAnn, edge1, edge2, err := netann.CreateChanAnnouncement(
channel.Info.AuthProof, channel.Info, channel.Policy1,
channel.Policy2,
return func(yield func(lnwire.Message, error) bool) {
// First, we'll query for all the set of channels that have an
// update that falls within the specified horizon.
chansInHorizon := c.graph.ChanUpdatesInHorizon(
startTime, endTime,
)
if err != nil {
return nil, err
}

// Create a slice to hold the `channel_announcement` and
// potentially two `channel_update` msgs.
//
// NOTE: Based on BOLT7, if a channel_announcement has no
// corresponding channel_updates, we must not send the
// channel_announcement. Thus we use this slice to decide we
// want to send this `channel_announcement` or not. By the end
// of the operation, if the len of the slice is 1, we will not
// send the `channel_announcement`. Otherwise, when sending the
// msgs, the `channel_announcement` must be sent prior to any
// corresponding `channel_update` or `node_annoucement`, that's
// why we create a slice here to maintain the order.
chanUpdates := make([]lnwire.Message, 0, 3)
chanUpdates = append(chanUpdates, chanAnn)

if edge1 != nil {
// We don't want to send channel updates that don't
// conform to the spec (anymore).
err := netann.ValidateChannelUpdateFields(0, edge1)
for channel, err := range chansInHorizon {
if err != nil {
log.Errorf("not sending invalid channel "+
"update %v: %v", edge1, err)
} else {
chanUpdates = append(chanUpdates, edge1)
yield(nil, err)
return
}
}

if edge2 != nil {
err := netann.ValidateChannelUpdateFields(0, edge2)
if err != nil {
log.Errorf("not sending invalid channel "+
"update %v: %v", edge2, err)
} else {
chanUpdates = append(chanUpdates, edge2)
// If the channel hasn't been fully advertised yet, or
// is a private channel, then we'll skip it as we can't
// construct a full authentication proof if one is
// requested.
if channel.Info.AuthProof == nil {
continue
}
}

// If there's no corresponding `channel_update` to send, skip
// sending this `channel_announcement`.
if len(chanUpdates) < 2 {
continue
}

// Append the all the msgs to the slice.
updates = append(updates, chanUpdates...)

// Record the nodes seen.
nodesFromChan[channel.Info.NodeKey1Bytes] = struct{}{}
nodesFromChan[channel.Info.NodeKey2Bytes] = struct{}{}
}

// Next, we'll send out all the node announcements that have an update
// within the horizon as well. We send these second to ensure that they
// follow any active channels they have.
nodeAnnsInHorizon, err := c.graph.NodeUpdatesInHorizon(
startTime, endTime,
)
if err != nil {
return nil, err
}
//nolint:ll
chanAnn, edge1, edge2, err := netann.CreateChanAnnouncement(
channel.Info.AuthProof, channel.Info,
channel.Policy1, channel.Policy2,
)
if err != nil {
if !yield(nil, err) {
return
}

for _, nodeAnn := range nodeAnnsInHorizon {
// If this node has not been seen in the above channels, we can
// skip sending its NodeAnnouncement.
if _, seen := nodesFromChan[nodeAnn.PubKeyBytes]; !seen {
log.Debugf("Skipping forwarding as node %x not found "+
"in channel announcement", nodeAnn.PubKeyBytes)
continue
}
continue
}

// Ensure we only forward nodes that are publicly advertised to
// prevent leaking information about nodes.
isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes)
if err != nil {
log.Errorf("Unable to determine if node %x is "+
"advertised: %v", nodeAnn.PubKeyBytes, err)
continue
}
if !yield(chanAnn, nil) {
return
}

if !isNodePublic {
log.Tracef("Skipping forwarding announcement for "+
"node %x due to being unadvertised",
nodeAnn.PubKeyBytes)
continue
// We don't want to send channel updates that don't
// conform to the spec (anymore), so check to make sure
// that these channel updates are valid before yielding
// them.
if edge1 != nil {
err := netann.ValidateChannelUpdateFields(
0, edge1,
)
if err != nil {
log.Errorf("not sending invalid "+
"channel update %v: %v",
edge1, err)
} else if !yield(edge1, nil) {
return
}
}
if edge2 != nil {
err := netann.ValidateChannelUpdateFields(
0, edge2,
)
if err != nil {
log.Errorf("not sending invalid "+
"channel update %v: %v", edge2,
err)
} else if !yield(edge2, nil) {
return
}
}
}

nodeUpdate, err := nodeAnn.NodeAnnouncement(true)
if err != nil {
return nil, err
}
// Next, we'll send out all the node announcements that have an
// update within the horizon as well. We send these second to
// ensure that they follow any active channels they have.
nodeAnnsInHorizon := c.graph.NodeUpdatesInHorizon(
startTime, endTime, graphdb.WithIterPublicNodesOnly(),
)
for nodeAnn, err := range nodeAnnsInHorizon {
if err != nil {
yield(nil, err)
return
}
nodeUpdate, err := nodeAnn.NodeAnnouncement(true)
if err != nil {
if !yield(nil, err) {
return
}

if err := netann.ValidateNodeAnnFields(nodeUpdate); err != nil {
log.Debugf("Skipping forwarding invalid node "+
"announcement %x: %v", nodeAnn.PubKeyBytes, err)
continue
}

continue
if !yield(nodeUpdate, nil) {
return
}
}

updates = append(updates, nodeUpdate)
}

return updates, nil
}

// FilterKnownChanIDs takes a target chain, and a set of channel ID's, and
Expand Down
58 changes: 50 additions & 8 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -1442,20 +1443,30 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,

// Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
newUpdatestoSend := g.cfg.channelSeries.UpdatesInHorizon(
g.cfg.chainHash, startTime, endTime,
)
if err != nil {

// Create a pull-based iterator so we can check if there are any
// updates before launching the goroutine.
next, stop := iter.Pull2(newUpdatestoSend)

// Check if we have any updates to send by attempting to get the first
// message.
firstMsg, firstErr, ok := next()
if firstErr != nil {
stop()
returnSema()
return err
return firstErr
}

log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
"start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
startTime, endTime, len(newUpdatestoSend))
"start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
startTime, endTime, ok)

// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
if !ok {
stop()
returnSema()
return nil
}
Expand All @@ -1472,14 +1483,45 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
}

// We'll conclude by launching a goroutine to send out any updates.
// The goroutine takes ownership of the iterator.
g.cg.WgAdd(1)
go func() {
defer g.cg.WgDone()
defer returnSema()
defer g.isSendingBacklog.Store(false)
defer stop()

// Send the first message we already pulled.
err := g.sendToPeerSync(ctx, firstMsg)
switch {
case errors.Is(err, ErrGossipSyncerExiting):
return

case errors.Is(err, lnpeer.ErrPeerExiting):
return

case err != nil:
log.Errorf("Unable to send message for "+
"peer catch up: %v", err)
}

// Continue with the rest of the messages using the same pull
// iterator.
for {
msg, err, ok := next()
if !ok {
return
}

// If the iterator yielded an error, log it and
// continue.
if err != nil {
log.Errorf("Error fetching update for peer "+
"catch up: %v", err)
continue
}

for _, msg := range newUpdatestoSend {
err := g.sendToPeerSync(ctx, msg)
err = g.sendToPeerSync(ctx, msg)
switch {
case err == ErrGossipSyncerExiting:
return
Expand Down
20 changes: 15 additions & 5 deletions discovery/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"math"
"reflect"
"sort"
Expand Down Expand Up @@ -86,13 +87,22 @@ func (m *mockChannelGraphTimeSeries) HighestChanID(_ context.Context,
}

func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
startTime, endTime time.Time) iter.Seq2[lnwire.Message, error] {

m.horizonReq <- horizonQuery{
chain, startTime, endTime,
}
return func(yield func(lnwire.Message, error) bool) {
m.horizonReq <- horizonQuery{
chain, startTime, endTime,
}

return <-m.horizonResp, nil
// We'll get the response from the channel, then yield it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message doesnt match the diff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revisit, had to do a pretty gnarly rebase to got all the commits compiling lol.

// immediately.
msgs := <-m.horizonResp
for _, msg := range msgs {
if !yield(msg, nil) {
return
}
}
}
}

func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash,
Expand Down
7 changes: 7 additions & 0 deletions docs/release-notes/release-notes-0.20.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ reader of a payment request.
- [Introduced](https://github.com/lightningnetwork/lnd/pull/10136) a wallet
interface to decouple the relationship between `lnd` and `btcwallet`.

- [Refactored](https://github.com/lightningnetwork/lnd/pull/10128) channel graph
update iterators to use Go's `iter.Seq2` pattern. The `UpdatesInHorizon`,
`NodeUpdatesInHorizon`, and `ChanUpdatesInHorizon` methods now return lazy
iterators instead of materializing all updates in memory at once, improving
memory efficiency for large graph operations.

## Breaking Changes
## Performance Improvements

Expand Down Expand Up @@ -300,6 +306,7 @@ reader of a payment request.
* Erick Cestari
* Funyug
* Mohamed Awnallah
* Olaoluwa Osuntokun
* Pins
* Torkel Rogstad
* Yong Yu
Expand Down
2 changes: 1 addition & 1 deletion fn/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/lightningnetwork/lnd/fn/v2

go 1.19
go 1.23

require (
github.com/stretchr/testify v1.8.1
Expand Down
Loading
Loading