Skip to content

Commit 641456e

Browse files
authored
Merge pull request #10128 from Roasbeef/iter-chan-updates
multi: update ChanUpdatesInHorizon and NodeUpdatesInHorizon to return iterators (iter.Seq[T])
2 parents b09b20c + 506c3d5 commit 641456e

File tree

18 files changed

+1433
-385
lines changed

18 files changed

+1433
-385
lines changed

discovery/chan_series.go

Lines changed: 81 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package discovery
22

33
import (
44
"context"
5+
"iter"
56
"time"
67

78
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -30,8 +31,8 @@ type ChannelGraphTimeSeries interface {
3031
// update timestamp between the start time and end time. We'll use this
3132
// to catch up a remote node to the set of channel updates that they
3233
// may have missed out on within the target chain.
33-
UpdatesInHorizon(chain chainhash.Hash,
34-
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)
34+
UpdatesInHorizon(chain chainhash.Hash, startTime time.Time,
35+
endTime time.Time) iter.Seq2[lnwire.Message, error]
3536

3637
// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
3738
// and returns a filtered set of chan ID's. This filtered set of chan
@@ -108,140 +109,100 @@ func (c *ChanSeries) HighestChanID(ctx context.Context,
108109
//
109110
// NOTE: This is part of the ChannelGraphTimeSeries interface.
110111
func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
111-
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
112+
startTime, endTime time.Time) iter.Seq2[lnwire.Message, error] {
112113

113-
var updates []lnwire.Message
114-
115-
// First, we'll query for all the set of channels that have an update
116-
// that falls within the specified horizon.
117-
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
118-
startTime, endTime,
119-
)
120-
if err != nil {
121-
return nil, err
122-
}
123-
124-
// nodesFromChan records the nodes seen from the channels.
125-
nodesFromChan := make(map[[33]byte]struct{}, len(chansInHorizon)*2)
126-
127-
for _, channel := range chansInHorizon {
128-
// If the channel hasn't been fully advertised yet, or is a
129-
// private channel, then we'll skip it as we can't construct a
130-
// full authentication proof if one is requested.
131-
if channel.Info.AuthProof == nil {
132-
continue
133-
}
134-
135-
chanAnn, edge1, edge2, err := netann.CreateChanAnnouncement(
136-
channel.Info.AuthProof, channel.Info, channel.Policy1,
137-
channel.Policy2,
114+
return func(yield func(lnwire.Message, error) bool) {
115+
// First, we'll query for all the set of channels that have an
116+
// update that falls within the specified horizon.
117+
chansInHorizon := c.graph.ChanUpdatesInHorizon(
118+
startTime, endTime,
138119
)
139-
if err != nil {
140-
return nil, err
141-
}
142120

143-
// Create a slice to hold the `channel_announcement` and
144-
// potentially two `channel_update` msgs.
145-
//
146-
// NOTE: Based on BOLT7, if a channel_announcement has no
147-
// corresponding channel_updates, we must not send the
148-
// channel_announcement. Thus we use this slice to decide we
149-
// want to send this `channel_announcement` or not. By the end
150-
// of the operation, if the len of the slice is 1, we will not
151-
// send the `channel_announcement`. Otherwise, when sending the
152-
// msgs, the `channel_announcement` must be sent prior to any
153-
// corresponding `channel_update` or `node_annoucement`, that's
154-
// why we create a slice here to maintain the order.
155-
chanUpdates := make([]lnwire.Message, 0, 3)
156-
chanUpdates = append(chanUpdates, chanAnn)
157-
158-
if edge1 != nil {
159-
// We don't want to send channel updates that don't
160-
// conform to the spec (anymore).
161-
err := netann.ValidateChannelUpdateFields(0, edge1)
121+
for channel, err := range chansInHorizon {
162122
if err != nil {
163-
log.Errorf("not sending invalid channel "+
164-
"update %v: %v", edge1, err)
165-
} else {
166-
chanUpdates = append(chanUpdates, edge1)
123+
yield(nil, err)
124+
return
167125
}
168-
}
169-
170-
if edge2 != nil {
171-
err := netann.ValidateChannelUpdateFields(0, edge2)
172-
if err != nil {
173-
log.Errorf("not sending invalid channel "+
174-
"update %v: %v", edge2, err)
175-
} else {
176-
chanUpdates = append(chanUpdates, edge2)
126+
// If the channel hasn't been fully advertised yet, or
127+
// is a private channel, then we'll skip it as we can't
128+
// construct a full authentication proof if one is
129+
// requested.
130+
if channel.Info.AuthProof == nil {
131+
continue
177132
}
178-
}
179133

180-
// If there's no corresponding `channel_update` to send, skip
181-
// sending this `channel_announcement`.
182-
if len(chanUpdates) < 2 {
183-
continue
184-
}
185-
186-
// Append the all the msgs to the slice.
187-
updates = append(updates, chanUpdates...)
188-
189-
// Record the nodes seen.
190-
nodesFromChan[channel.Info.NodeKey1Bytes] = struct{}{}
191-
nodesFromChan[channel.Info.NodeKey2Bytes] = struct{}{}
192-
}
193-
194-
// Next, we'll send out all the node announcements that have an update
195-
// within the horizon as well. We send these second to ensure that they
196-
// follow any active channels they have.
197-
nodeAnnsInHorizon, err := c.graph.NodeUpdatesInHorizon(
198-
startTime, endTime,
199-
)
200-
if err != nil {
201-
return nil, err
202-
}
134+
//nolint:ll
135+
chanAnn, edge1, edge2, err := netann.CreateChanAnnouncement(
136+
channel.Info.AuthProof, channel.Info,
137+
channel.Policy1, channel.Policy2,
138+
)
139+
if err != nil {
140+
if !yield(nil, err) {
141+
return
142+
}
203143

204-
for _, nodeAnn := range nodeAnnsInHorizon {
205-
// If this node has not been seen in the above channels, we can
206-
// skip sending its NodeAnnouncement.
207-
if _, seen := nodesFromChan[nodeAnn.PubKeyBytes]; !seen {
208-
log.Debugf("Skipping forwarding as node %x not found "+
209-
"in channel announcement", nodeAnn.PubKeyBytes)
210-
continue
211-
}
144+
continue
145+
}
212146

213-
// Ensure we only forward nodes that are publicly advertised to
214-
// prevent leaking information about nodes.
215-
isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes)
216-
if err != nil {
217-
log.Errorf("Unable to determine if node %x is "+
218-
"advertised: %v", nodeAnn.PubKeyBytes, err)
219-
continue
220-
}
147+
if !yield(chanAnn, nil) {
148+
return
149+
}
221150

222-
if !isNodePublic {
223-
log.Tracef("Skipping forwarding announcement for "+
224-
"node %x due to being unadvertised",
225-
nodeAnn.PubKeyBytes)
226-
continue
151+
// We don't want to send channel updates that don't
152+
// conform to the spec (anymore), so check to make sure
153+
// that these channel updates are valid before yielding
154+
// them.
155+
if edge1 != nil {
156+
err := netann.ValidateChannelUpdateFields(
157+
0, edge1,
158+
)
159+
if err != nil {
160+
log.Errorf("not sending invalid "+
161+
"channel update %v: %v",
162+
edge1, err)
163+
} else if !yield(edge1, nil) {
164+
return
165+
}
166+
}
167+
if edge2 != nil {
168+
err := netann.ValidateChannelUpdateFields(
169+
0, edge2,
170+
)
171+
if err != nil {
172+
log.Errorf("not sending invalid "+
173+
"channel update %v: %v", edge2,
174+
err)
175+
} else if !yield(edge2, nil) {
176+
return
177+
}
178+
}
227179
}
228180

229-
nodeUpdate, err := nodeAnn.NodeAnnouncement(true)
230-
if err != nil {
231-
return nil, err
232-
}
181+
// Next, we'll send out all the node announcements that have an
182+
// update within the horizon as well. We send these second to
183+
// ensure that they follow any active channels they have.
184+
nodeAnnsInHorizon := c.graph.NodeUpdatesInHorizon(
185+
startTime, endTime, graphdb.WithIterPublicNodesOnly(),
186+
)
187+
for nodeAnn, err := range nodeAnnsInHorizon {
188+
if err != nil {
189+
yield(nil, err)
190+
return
191+
}
192+
nodeUpdate, err := nodeAnn.NodeAnnouncement(true)
193+
if err != nil {
194+
if !yield(nil, err) {
195+
return
196+
}
233197

234-
if err := netann.ValidateNodeAnnFields(nodeUpdate); err != nil {
235-
log.Debugf("Skipping forwarding invalid node "+
236-
"announcement %x: %v", nodeAnn.PubKeyBytes, err)
198+
continue
199+
}
237200

238-
continue
201+
if !yield(nodeUpdate, nil) {
202+
return
203+
}
239204
}
240-
241-
updates = append(updates, nodeUpdate)
242205
}
243-
244-
return updates, nil
245206
}
246207

247208
// FilterKnownChanIDs takes a target chain, and a set of channel ID's, and

discovery/syncer.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"math"
89
"math/rand"
910
"sort"
@@ -1442,20 +1443,30 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
14421443

14431444
// Now that the remote peer has applied their filter, we'll query the
14441445
// database for all the messages that are beyond this filter.
1445-
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
1446+
newUpdatestoSend := g.cfg.channelSeries.UpdatesInHorizon(
14461447
g.cfg.chainHash, startTime, endTime,
14471448
)
1448-
if err != nil {
1449+
1450+
// Create a pull-based iterator so we can check if there are any
1451+
// updates before launching the goroutine.
1452+
next, stop := iter.Pull2(newUpdatestoSend)
1453+
1454+
// Check if we have any updates to send by attempting to get the first
1455+
// message.
1456+
firstMsg, firstErr, ok := next()
1457+
if firstErr != nil {
1458+
stop()
14491459
returnSema()
1450-
return err
1460+
return firstErr
14511461
}
14521462

14531463
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
1454-
"start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
1455-
startTime, endTime, len(newUpdatestoSend))
1464+
"start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
1465+
startTime, endTime, ok)
14561466

14571467
// If we don't have any to send, then we can return early.
1458-
if len(newUpdatestoSend) == 0 {
1468+
if !ok {
1469+
stop()
14591470
returnSema()
14601471
return nil
14611472
}
@@ -1472,14 +1483,45 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
14721483
}
14731484

14741485
// We'll conclude by launching a goroutine to send out any updates.
1486+
// The goroutine takes ownership of the iterator.
14751487
g.cg.WgAdd(1)
14761488
go func() {
14771489
defer g.cg.WgDone()
14781490
defer returnSema()
14791491
defer g.isSendingBacklog.Store(false)
1492+
defer stop()
1493+
1494+
// Send the first message we already pulled.
1495+
err := g.sendToPeerSync(ctx, firstMsg)
1496+
switch {
1497+
case errors.Is(err, ErrGossipSyncerExiting):
1498+
return
1499+
1500+
case errors.Is(err, lnpeer.ErrPeerExiting):
1501+
return
1502+
1503+
case err != nil:
1504+
log.Errorf("Unable to send message for "+
1505+
"peer catch up: %v", err)
1506+
}
1507+
1508+
// Continue with the rest of the messages using the same pull
1509+
// iterator.
1510+
for {
1511+
msg, err, ok := next()
1512+
if !ok {
1513+
return
1514+
}
1515+
1516+
// If the iterator yielded an error, log it and
1517+
// continue.
1518+
if err != nil {
1519+
log.Errorf("Error fetching update for peer "+
1520+
"catch up: %v", err)
1521+
continue
1522+
}
14801523

1481-
for _, msg := range newUpdatestoSend {
1482-
err := g.sendToPeerSync(ctx, msg)
1524+
err = g.sendToPeerSync(ctx, msg)
14831525
switch {
14841526
case err == ErrGossipSyncerExiting:
14851527
return

discovery/syncer_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"math"
89
"reflect"
910
"sort"
@@ -86,13 +87,22 @@ func (m *mockChannelGraphTimeSeries) HighestChanID(_ context.Context,
8687
}
8788

8889
func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
89-
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
90+
startTime, endTime time.Time) iter.Seq2[lnwire.Message, error] {
9091

91-
m.horizonReq <- horizonQuery{
92-
chain, startTime, endTime,
93-
}
92+
return func(yield func(lnwire.Message, error) bool) {
93+
m.horizonReq <- horizonQuery{
94+
chain, startTime, endTime,
95+
}
9496

95-
return <-m.horizonResp, nil
97+
// We'll get the response from the channel, then yield it
98+
// immediately.
99+
msgs := <-m.horizonResp
100+
for _, msg := range msgs {
101+
if !yield(msg, nil) {
102+
return
103+
}
104+
}
105+
}
96106
}
97107

98108
func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash,

docs/release-notes/release-notes-0.20.0.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ reader of a payment request.
219219
- [Introduced](https://github.com/lightningnetwork/lnd/pull/10136) a wallet
220220
interface to decouple the relationship between `lnd` and `btcwallet`.
221221

222+
- [Refactored](https://github.com/lightningnetwork/lnd/pull/10128) channel graph
223+
update iterators to use Go's `iter.Seq2` pattern. The `UpdatesInHorizon`,
224+
`NodeUpdatesInHorizon`, and `ChanUpdatesInHorizon` methods now return lazy
225+
iterators instead of materializing all updates in memory at once, improving
226+
memory efficiency for large graph operations.
227+
222228
## Breaking Changes
223229
## Performance Improvements
224230

@@ -300,6 +306,7 @@ reader of a payment request.
300306
* Erick Cestari
301307
* Funyug
302308
* Mohamed Awnallah
309+
* Olaoluwa Osuntokun
303310
* Pins
304311
* Torkel Rogstad
305312
* Yong Yu

fn/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/lightningnetwork/lnd/fn/v2
22

3-
go 1.19
3+
go 1.23
44

55
require (
66
github.com/stretchr/testify v1.8.1

0 commit comments

Comments
 (0)