Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion autopilot/prefattach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ func newDiskChanGraph(t *testing.T) (testGraph, error) {
})
require.NoError(t, err)

graphDB, err := graphdb.NewChannelGraph(&graphdb.Config{KVDB: backend})
graphStore, err := graphdb.NewKVStore(backend)
require.NoError(t, err)

graphDB, err := graphdb.NewChannelGraph(graphStore)
require.NoError(t, err)

require.NoError(t, graphDB.Start())
Expand Down
12 changes: 8 additions & 4 deletions config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,14 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
)
}

dbs.GraphDB, err = graphdb.NewChannelGraph(&graphdb.Config{
KVDB: databaseBackends.GraphDB,
KVStoreOpts: graphDBOptions,
}, chanGraphOpts...)
graphStore, err := graphdb.NewKVStore(
databaseBackends.GraphDB, graphDBOptions...,
)
if err != nil {
return nil, nil, err
}

dbs.GraphDB, err = graphdb.NewChannelGraph(graphStore, chanGraphOpts...)
if err != nil {
cleanUp()

Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.20.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
# Improvements
## Functional Updates

* Graph Store SQL implementation and migration project:
* Introduce an [abstract graph
store](https://github.com/lightningnetwork/lnd/pull/9791) interface.


## RPC Updates

## lncli Updates
Expand Down
64 changes: 23 additions & 41 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
Expand All @@ -20,18 +19,6 @@ import (
// busy shutting down.
var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down")

// Config is a struct that holds all the necessary dependencies for a
// ChannelGraph.
type Config struct {
// KVDB is the kvdb.Backend that will be used for initializing the
// KVStore CRUD layer.
KVDB kvdb.Backend

// KVStoreOpts is a list of functional options that will be used when
// initializing the KVStore.
KVStoreOpts []KVStoreOptionModifier
}

// ChannelGraph is a layer above the graph's CRUD layer.
//
// NOTE: currently, this is purely a pass-through layer directly to the backing
Expand All @@ -48,29 +35,24 @@ type ChannelGraph struct {

graphCache *GraphCache

*KVStore
V1Store
*topologyManager

quit chan struct{}
wg sync.WaitGroup
}

// NewChannelGraph creates a new ChannelGraph instance with the given backend.
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
error) {
func NewChannelGraph(v1Store V1Store,
options ...ChanGraphOption) (*ChannelGraph, error) {

opts := defaultChanGraphOptions()
for _, o := range options {
o(opts)
}

store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
if err != nil {
return nil, err
}

g := &ChannelGraph{
KVStore: store,
V1Store: v1Store,
topologyManager: newTopologyManager(),
quit: make(chan struct{}),
}
Expand Down Expand Up @@ -184,7 +166,7 @@ func (c *ChannelGraph) populateCache() error {
log.Info("Populating in-memory channel graph, this might take a " +
"while...")

err := c.KVStore.ForEachNodeCacheable(func(node route.Vertex,
err := c.V1Store.ForEachNodeCacheable(func(node route.Vertex,
features *lnwire.FeatureVector) error {

c.graphCache.AddNodeFeatures(node, features)
Expand All @@ -195,7 +177,7 @@ func (c *ChannelGraph) populateCache() error {
return err
}

err = c.KVStore.ForEachChannel(func(info *models.ChannelEdgeInfo,
err = c.V1Store.ForEachChannel(func(info *models.ChannelEdgeInfo,
policy1, policy2 *models.ChannelEdgePolicy) error {

c.graphCache.AddChannel(info, policy1, policy2)
Expand Down Expand Up @@ -229,7 +211,7 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
return c.graphCache.ForEachChannel(node, cb)
}

return c.KVStore.ForEachNodeDirectedChannel(node, cb)
return c.V1Store.ForEachNodeDirectedChannel(node, cb)
}

// FetchNodeFeatures returns the features of the given node. If no features are
Expand All @@ -245,7 +227,7 @@ func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
return c.graphCache.GetFeatures(node), nil
}

return c.KVStore.FetchNodeFeatures(node)
return c.V1Store.FetchNodeFeatures(node)
}

// GraphSession will provide the call-back with access to a NodeTraverser
Expand All @@ -257,7 +239,7 @@ func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
return cb(c)
}

return c.KVStore.GraphSession(cb)
return c.V1Store.GraphSession(cb)
}

// ForEachNodeCached iterates through all the stored vertices/nodes in the
Expand All @@ -271,7 +253,7 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
return c.graphCache.ForEachNode(cb)
}

return c.KVStore.ForEachNodeCached(cb)
return c.V1Store.ForEachNodeCached(cb)
}

// AddLightningNode adds a vertex/node to the graph database. If the node is not
Expand All @@ -286,7 +268,7 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.AddLightningNode(node, op...)
err := c.V1Store.AddLightningNode(node, op...)
if err != nil {
return err
}
Expand All @@ -312,7 +294,7 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.DeleteLightningNode(nodePub)
err := c.V1Store.DeleteLightningNode(nodePub)
if err != nil {
return err
}
Expand All @@ -336,7 +318,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.AddChannelEdge(edge, op...)
err := c.V1Store.AddChannelEdge(edge, op...)
if err != nil {
return err
}
Expand All @@ -361,15 +343,15 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.MarkEdgeLive(chanID)
err := c.V1Store.MarkEdgeLive(chanID)
if err != nil {
return err
}

if c.graphCache != nil {
// We need to add the channel back into our graph cache,
// otherwise we won't use it for path finding.
infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
infos, err := c.V1Store.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}
Expand Down Expand Up @@ -400,7 +382,7 @@ func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

infos, err := c.KVStore.DeleteChannelEdges(
infos, err := c.V1Store.DeleteChannelEdges(
strictZombiePruning, markZombie, chanIDs...,
)
if err != nil {
Expand Down Expand Up @@ -432,7 +414,7 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

edges, err := c.KVStore.DisconnectBlockAtHeight(height)
edges, err := c.V1Store.DisconnectBlockAtHeight(height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -463,7 +445,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

edges, nodes, err := c.KVStore.PruneGraph(
edges, nodes, err := c.V1Store.PruneGraph(
spentOutputs, blockHash, blockHeight,
)
if err != nil {
Expand Down Expand Up @@ -508,7 +490,7 @@ func (c *ChannelGraph) PruneGraphNodes() error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

nodes, err := c.KVStore.PruneGraphNodes()
nodes, err := c.V1Store.PruneGraphNodes()
if err != nil {
return err
}
Expand All @@ -530,7 +512,7 @@ func (c *ChannelGraph) PruneGraphNodes() error {
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {

unknown, knownZombies, err := c.KVStore.FilterKnownChanIDs(chansInfo)
unknown, knownZombies, err := c.V1Store.FilterKnownChanIDs(chansInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -559,7 +541,7 @@ func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
// timestamps could bring it back from the dead, then we mark it
// alive, and we let it be added to the set of IDs to query our
// peer for.
err := c.KVStore.MarkEdgeLive(
err := c.V1Store.MarkEdgeLive(
info.ShortChannelID.ToUint64(),
)
// Since there is a chance that the edge could have been marked
Expand All @@ -583,7 +565,7 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.MarkEdgeZombie(chanID, pubKey1, pubKey2)
err := c.V1Store.MarkEdgeZombie(chanID, pubKey1, pubKey2)
if err != nil {
return err
}
Expand All @@ -608,7 +590,7 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

from, to, err := c.KVStore.UpdateEdgePolicy(edge, op...)
from, to, err := c.V1Store.UpdateEdgePolicy(edge, op...)
if err != nil {
return err
}
Expand Down
41 changes: 21 additions & 20 deletions graph/db/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,12 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
graph, err := MakeTestGraph(t)
require.NoError(t, err, "unable to make test database")

// The update index only applies to the bbolt graph.
boltStore, ok := graph.V1Store.(*KVStore)
if !ok {
t.Skipf("skipping test that is aimed at a bbolt graph DB")
}

sourceNode := createTestVertex(t)
if err := graph.SetSourceNode(sourceNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
Expand Down Expand Up @@ -2999,7 +3005,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
timestampSet[t] = struct{}{}
}

err := kvdb.View(graph.db, func(tx kvdb.RTx) error {
err := kvdb.View(boltStore.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand Down Expand Up @@ -3467,6 +3473,12 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
graph, err := MakeTestGraph(t)
require.NoError(t, err, "unable to make test database")

// This test currently directly edits the bytes stored in the bbolt DB.
boltStore, ok := graph.V1Store.(*KVStore)
if !ok {
t.Skipf("skipping test that is aimed at a bbolt graph DB")
}

// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1 := createTestVertex(t)
Expand Down Expand Up @@ -3515,25 +3527,11 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {

// Attempting to deserialize these bytes should return an error.
r := bytes.NewReader(stripped)
err = kvdb.View(graph.db, func(tx kvdb.RTx) error {
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}

_, err = deserializeChanEdgePolicy(r)
if err != ErrEdgePolicyOptionalFieldNotFound {
t.Fatalf("expected "+
"ErrEdgePolicyOptionalFieldNotFound, got %v",
err)
}

return nil
}, func() {})
require.NoError(t, err, "error reading db")
_, err = deserializeChanEdgePolicy(r)
require.ErrorIs(t, err, ErrEdgePolicyOptionalFieldNotFound)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol, so the whole kvdb.View() call was basically a no-op in the first place...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed 🤓


// Put the stripped bytes in the DB.
err = kvdb.Update(graph.db, func(tx kvdb.RwTx) error {
err = kvdb.Update(boltStore.db, func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
Expand Down Expand Up @@ -4086,7 +4084,10 @@ func TestGraphLoading(t *testing.T) {
defer backend.Close()
defer backendCleanup()

graph, err := NewChannelGraph(&Config{KVDB: backend})
graphStore, err := NewKVStore(backend)
require.NoError(t, err)

graph, err := NewChannelGraph(graphStore)
require.NoError(t, err)
require.NoError(t, graph.Start())
t.Cleanup(func() {
Expand All @@ -4100,7 +4101,7 @@ func TestGraphLoading(t *testing.T) {

// Recreate the graph. This should cause the graph cache to be
// populated.
graphReloaded, err := NewChannelGraph(&Config{KVDB: backend})
graphReloaded, err := NewChannelGraph(graphStore)
require.NoError(t, err)
require.NoError(t, graphReloaded.Start())
t.Cleanup(func() {
Expand Down
Loading
Loading