Skip to content

Commit 225a694

Browse files
authored
Merge pull request #9695 from ellemouton/graph-clean-up
graph/db: remove various `kvdb` parameters from exported methods
2 parents e214b57 + f6bdb87 commit 225a694

File tree

7 files changed

+170
-96
lines changed

7 files changed

+170
-96
lines changed

autopilot/prefattach_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ func (t *testNodeTx) Node() *models.LightningNode {
736736
func (t *testNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
737737
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
738738

739-
return t.db.db.ForEachNodeChannel(t.node.PubKeyBytes, func(_ kvdb.RTx,
739+
return t.db.db.ForEachNodeChannel(t.node.PubKeyBytes, func(
740740
edge *models.ChannelEdgeInfo, policy1,
741741
policy2 *models.ChannelEdgePolicy) error {
742742

graph/builder.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/lightningnetwork/lnd/chainntnfs"
1414
graphdb "github.com/lightningnetwork/lnd/graph/db"
1515
"github.com/lightningnetwork/lnd/graph/db/models"
16-
"github.com/lightningnetwork/lnd/kvdb"
1716
"github.com/lightningnetwork/lnd/lnutils"
1817
"github.com/lightningnetwork/lnd/lnwallet"
1918
"github.com/lightningnetwork/lnd/lnwire"
@@ -1276,8 +1275,7 @@ func (b *Builder) ForAllOutgoingChannels(cb func(*models.ChannelEdgeInfo,
12761275
*models.ChannelEdgePolicy) error) error {
12771276

12781277
return b.cfg.Graph.ForEachNodeChannel(b.cfg.SelfNode,
1279-
func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
1280-
e *models.ChannelEdgePolicy,
1278+
func(c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy,
12811279
_ *models.ChannelEdgePolicy) error {
12821280

12831281
if e == nil {

graph/db/graph_test.go

Lines changed: 104 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ func createChannelEdge(node1, node2 *models.LightningNode) (
618618
chanID := uint64(prand.Int63())
619619
outpoint := wire.OutPoint{
620620
Hash: rev,
621-
Index: 9,
621+
Index: prand.Uint32(),
622622
}
623623

624624
// Add the new edge to the database, this should proceed without any
@@ -991,6 +991,97 @@ func newEdgePolicy(chanID uint64, updateTime int64) *models.ChannelEdgePolicy {
991991
}
992992
}
993993

994+
// TestForEachSourceNodeChannel tests that the ForEachSourceNodeChannel
995+
// correctly iterates through the channels of the set source node.
996+
func TestForEachSourceNodeChannel(t *testing.T) {
997+
t.Parallel()
998+
999+
graph, err := MakeTestGraph(t)
1000+
require.NoError(t, err, "unable to make test database")
1001+
1002+
// Create a source node (A) and set it as such in the DB.
1003+
nodeA := createTestVertex(t)
1004+
require.NoError(t, graph.SetSourceNode(nodeA))
1005+
1006+
// Now, create a few more nodes (B, C, D) along with some channels
1007+
// between them. We'll create the following graph:
1008+
//
1009+
// A -- B -- D
1010+
// |
1011+
// C
1012+
//
1013+
// The graph includes a channel (B-D) that does not belong to the source
1014+
// node along with 2 channels (A-B and A-C) that do belong to the source
1015+
// node. For the A-B channel, we will let the source node set an
1016+
// outgoing policy but for the A-C channel, we will set only an incoming
1017+
// policy.
1018+
1019+
nodeB := createTestVertex(t)
1020+
nodeC := createTestVertex(t)
1021+
nodeD := createTestVertex(t)
1022+
1023+
abEdge, abPolicy1, abPolicy2 := createChannelEdge(nodeA, nodeB)
1024+
require.NoError(t, graph.AddChannelEdge(abEdge))
1025+
acEdge, acPolicy1, acPolicy2 := createChannelEdge(nodeA, nodeC)
1026+
require.NoError(t, graph.AddChannelEdge(acEdge))
1027+
bdEdge, _, _ := createChannelEdge(nodeB, nodeD)
1028+
require.NoError(t, graph.AddChannelEdge(bdEdge))
1029+
1030+
// Figure out which of the policies returned above are node A's so that
1031+
// we know which to persist.
1032+
//
1033+
// First, set the outgoing policy for the A-B channel.
1034+
abPolicyAOutgoing := abPolicy1
1035+
if !bytes.Equal(abPolicy1.ToNode[:], nodeB.PubKeyBytes[:]) {
1036+
abPolicyAOutgoing = abPolicy2
1037+
}
1038+
require.NoError(t, graph.UpdateEdgePolicy(abPolicyAOutgoing))
1039+
1040+
// Now, set the incoming policy for the A-C channel.
1041+
acPolicyAIncoming := acPolicy1
1042+
if !bytes.Equal(acPolicy1.ToNode[:], nodeA.PubKeyBytes[:]) {
1043+
acPolicyAIncoming = acPolicy2
1044+
}
1045+
require.NoError(t, graph.UpdateEdgePolicy(acPolicyAIncoming))
1046+
1047+
type sourceNodeChan struct {
1048+
otherNode route.Vertex
1049+
havePolicy bool
1050+
}
1051+
1052+
// Put together our expected source node channels.
1053+
expectedSrcChans := map[wire.OutPoint]*sourceNodeChan{
1054+
abEdge.ChannelPoint: {
1055+
otherNode: nodeB.PubKeyBytes,
1056+
havePolicy: true,
1057+
},
1058+
acEdge.ChannelPoint: {
1059+
otherNode: nodeC.PubKeyBytes,
1060+
havePolicy: false,
1061+
},
1062+
}
1063+
1064+
// Now, we'll use the ForEachSourceNodeChannel and assert that it
1065+
// returns the expected data in the call-back.
1066+
err = graph.ForEachSourceNodeChannel(func(chanPoint wire.OutPoint,
1067+
havePolicy bool, otherNode *models.LightningNode) error {
1068+
1069+
require.Contains(t, expectedSrcChans, chanPoint)
1070+
expected := expectedSrcChans[chanPoint]
1071+
1072+
require.Equal(
1073+
t, expected.otherNode[:], otherNode.PubKeyBytes[:],
1074+
)
1075+
require.Equal(t, expected.havePolicy, havePolicy)
1076+
1077+
delete(expectedSrcChans, chanPoint)
1078+
1079+
return nil
1080+
})
1081+
require.NoError(t, err)
1082+
require.Empty(t, expectedSrcChans)
1083+
}
1084+
9941085
func TestGraphTraversal(t *testing.T) {
9951086
t.Parallel()
9961087

@@ -1050,7 +1141,7 @@ func TestGraphTraversal(t *testing.T) {
10501141
numNodeChans := 0
10511142
firstNode, secondNode := nodeList[0], nodeList[1]
10521143
err = graph.ForEachNodeChannel(firstNode.PubKeyBytes,
1053-
func(_ kvdb.RTx, _ *models.ChannelEdgeInfo, outEdge,
1144+
func(_ *models.ChannelEdgeInfo, outEdge,
10541145
inEdge *models.ChannelEdgePolicy) error {
10551146

10561147
// All channels between first and second node should
@@ -1126,26 +1217,15 @@ func TestGraphTraversalCacheable(t *testing.T) {
11261217
require.NoError(t, err)
11271218
require.Len(t, nodeMap, 0)
11281219

1129-
err = graph.db.View(func(tx kvdb.RTx) error {
1130-
for _, node := range nodes {
1131-
err := graph.ForEachNodeChannelTx(tx, node,
1132-
func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
1133-
policy *models.ChannelEdgePolicy,
1134-
policy2 *models.ChannelEdgePolicy) error { //nolint:ll
1135-
1136-
delete(chanIndex, info.ChannelID)
1137-
return nil
1138-
},
1139-
)
1140-
if err != nil {
1141-
return err
1142-
}
1143-
}
1144-
1145-
return nil
1146-
}, func() {})
1147-
1148-
require.NoError(t, err)
1220+
for _, node := range nodes {
1221+
err = graph.ForEachNodeDirectedChannel(
1222+
node, func(d *DirectedChannel) error {
1223+
delete(chanIndex, d.ChannelID)
1224+
return nil
1225+
},
1226+
)
1227+
require.NoError(t, err)
1228+
}
11491229
require.Len(t, chanIndex, 0)
11501230
}
11511231

@@ -2802,7 +2882,7 @@ func TestIncompleteChannelPolicies(t *testing.T) {
28022882

28032883
calls := 0
28042884
err := graph.ForEachNodeChannel(node.PubKeyBytes,
2805-
func(_ kvdb.RTx, _ *models.ChannelEdgeInfo, outEdge,
2885+
func(_ *models.ChannelEdgeInfo, outEdge,
28062886
inEdge *models.ChannelEdgePolicy) error {
28072887

28082888
if !expectedOut && outEdge != nil {
@@ -3921,8 +4001,7 @@ func BenchmarkForEachChannel(b *testing.B) {
39214001
require.NoError(b, err)
39224002

39234003
for _, n := range nodes {
3924-
cb := func(tx kvdb.RTx,
3925-
info *models.ChannelEdgeInfo,
4004+
cb := func(info *models.ChannelEdgeInfo,
39264005
policy *models.ChannelEdgePolicy,
39274006
policy2 *models.ChannelEdgePolicy) error { //nolint:ll
39284007

graph/db/kv_store.go

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -299,29 +299,6 @@ var graphTopLevelBuckets = [][]byte{
299299
closedScidBucket,
300300
}
301301

302-
// Wipe completely deletes all saved state within all used buckets within the
303-
// database. The deletion is done in a single transaction, therefore this
304-
// operation is fully atomic.
305-
func (c *KVStore) Wipe() error {
306-
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
307-
for _, tlb := range graphTopLevelBuckets {
308-
err := tx.DeleteTopLevelBucket(tlb)
309-
if err != nil &&
310-
!errors.Is(err, kvdb.ErrBucketNotFound) {
311-
312-
return err
313-
}
314-
}
315-
316-
return nil
317-
}, func() {})
318-
if err != nil {
319-
return err
320-
}
321-
322-
return initKVStore(c.db)
323-
}
324-
325302
// createChannelDB creates and initializes a fresh version of In
326303
// the case that the target path has not yet been created or doesn't yet exist,
327304
// then the path is created. Additionally, all required top-level buckets used
@@ -584,7 +561,7 @@ func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
584561

585562
channels := make(map[uint64]*DirectedChannel)
586563

587-
err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
564+
err := c.forEachNodeChannelTx(tx, node.PubKeyBytes,
588565
func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
589566
p1 *models.ChannelEdgePolicy,
590567
p2 *models.ChannelEdgePolicy) error {
@@ -2873,7 +2850,7 @@ func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex,
28732850
// used to terminate the check early.
28742851
nodeIsPublic := false
28752852
errDone := errors.New("done")
2876-
err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
2853+
err := c.forEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
28772854
info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
28782855
_ *models.ChannelEdgePolicy) error {
28792856

@@ -3126,13 +3103,56 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
31263103
//
31273104
// Unknown policies are passed into the callback as nil values.
31283105
func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex,
3129-
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3106+
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
31303107
*models.ChannelEdgePolicy) error) error {
31313108

3132-
return nodeTraversal(nil, nodePub[:], c.db, cb)
3109+
return nodeTraversal(nil, nodePub[:], c.db, func(_ kvdb.RTx,
3110+
info *models.ChannelEdgeInfo, policy,
3111+
policy2 *models.ChannelEdgePolicy) error {
3112+
3113+
return cb(info, policy, policy2)
3114+
})
3115+
}
3116+
3117+
// ForEachSourceNodeChannel iterates through all channels of the source node,
3118+
// executing the passed callback on each. The callback is provided with the
3119+
// channel's outpoint, whether we have a policy for the channel and the channel
3120+
// peer's node information.
3121+
func (c *KVStore) ForEachSourceNodeChannel(cb func(chanPoint wire.OutPoint,
3122+
havePolicy bool, otherNode *models.LightningNode) error) error {
3123+
3124+
return kvdb.View(c.db, func(tx kvdb.RTx) error {
3125+
nodes := tx.ReadBucket(nodeBucket)
3126+
if nodes == nil {
3127+
return ErrGraphNotFound
3128+
}
3129+
3130+
node, err := c.sourceNode(nodes)
3131+
if err != nil {
3132+
return err
3133+
}
3134+
3135+
return nodeTraversal(
3136+
tx, node.PubKeyBytes[:], c.db, func(tx kvdb.RTx,
3137+
info *models.ChannelEdgeInfo,
3138+
policy, _ *models.ChannelEdgePolicy) error {
3139+
3140+
peer, err := c.fetchOtherNode(
3141+
tx, info, node.PubKeyBytes[:],
3142+
)
3143+
if err != nil {
3144+
return err
3145+
}
3146+
3147+
return cb(
3148+
info.ChannelPoint, policy != nil, peer,
3149+
)
3150+
},
3151+
)
3152+
}, func() {})
31333153
}
31343154

3135-
// ForEachNodeChannelTx iterates through all channels of the given node,
3155+
// forEachNodeChannelTx iterates through all channels of the given node,
31363156
// executing the passed callback with an edge info structure and the policies
31373157
// of each end of the channel. The first edge policy is the outgoing edge *to*
31383158
// the connecting node, while the second is the incoming edge *from* the
@@ -3145,19 +3165,19 @@ func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex,
31453165
// should be passed as the first argument. Otherwise, the first argument should
31463166
// be nil and a fresh transaction will be created to execute the graph
31473167
// traversal.
3148-
func (c *KVStore) ForEachNodeChannelTx(tx kvdb.RTx,
3168+
func (c *KVStore) forEachNodeChannelTx(tx kvdb.RTx,
31493169
nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
31503170
*models.ChannelEdgePolicy,
31513171
*models.ChannelEdgePolicy) error) error {
31523172

31533173
return nodeTraversal(tx, nodePub[:], c.db, cb)
31543174
}
31553175

3156-
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3176+
// fetchOtherNode attempts to fetch the full LightningNode that's opposite of
31573177
// the target node in the channel. This is useful when one knows the pubkey of
31583178
// one of the nodes, and wishes to obtain the full LightningNode for the other
31593179
// end of the channel.
3160-
func (c *KVStore) FetchOtherNode(tx kvdb.RTx,
3180+
func (c *KVStore) fetchOtherNode(tx kvdb.RTx,
31613181
channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
31623182
*models.LightningNode, error) {
31633183

@@ -4687,7 +4707,7 @@ func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
46874707
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
46884708
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
46894709

4690-
return c.db.ForEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
4710+
return c.db.forEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
46914711
func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
46924712
policy2 *models.ChannelEdgePolicy) error {
46934713

graph/interfaces.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/lightningnetwork/lnd/batch"
99
graphdb "github.com/lightningnetwork/lnd/graph/db"
1010
"github.com/lightningnetwork/lnd/graph/db/models"
11-
"github.com/lightningnetwork/lnd/kvdb"
1211
"github.com/lightningnetwork/lnd/lnwire"
1312
"github.com/lightningnetwork/lnd/routing/route"
1413
)
@@ -254,10 +253,9 @@ type DB interface {
254253
// to the caller.
255254
//
256255
// Unknown policies are passed into the callback as nil values.
257-
ForEachNodeChannel(nodePub route.Vertex, cb func(kvdb.RTx,
258-
*models.ChannelEdgeInfo,
259-
*models.ChannelEdgePolicy,
260-
*models.ChannelEdgePolicy) error) error
256+
ForEachNodeChannel(nodePub route.Vertex,
257+
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
258+
*models.ChannelEdgePolicy) error) error
261259

262260
// AddEdgeProof sets the proof of an existing edge in the graph
263261
// database.

rpcserver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import (
5656
"github.com/lightningnetwork/lnd/input"
5757
"github.com/lightningnetwork/lnd/invoices"
5858
"github.com/lightningnetwork/lnd/keychain"
59-
"github.com/lightningnetwork/lnd/kvdb"
6059
"github.com/lightningnetwork/lnd/labels"
6160
"github.com/lightningnetwork/lnd/lncfg"
6261
"github.com/lightningnetwork/lnd/lnrpc"
@@ -6959,7 +6958,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
69596958
)
69606959

69616960
err = graph.ForEachNodeChannel(node.PubKeyBytes,
6962-
func(_ kvdb.RTx, edge *models.ChannelEdgeInfo,
6961+
func(edge *models.ChannelEdgeInfo,
69636962
c1, c2 *models.ChannelEdgePolicy) error {
69646963

69656964
numChannels++
@@ -7641,7 +7640,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
76417640

76427641
var feeReports []*lnrpc.ChannelFeeReport
76437642
err = channelGraph.ForEachNodeChannel(selfNode.PubKeyBytes,
7644-
func(_ kvdb.RTx, chanInfo *models.ChannelEdgeInfo,
7643+
func(chanInfo *models.ChannelEdgeInfo,
76457644
edgePolicy, _ *models.ChannelEdgePolicy) error {
76467645

76477646
// Self node should always have policies for its

0 commit comments

Comments
 (0)