Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ func New(cfg Config, initialState []LocalChannel) (*Agent, error) {

// Start starts the agent along with any goroutines it needs to perform its
// normal duties.
func (a *Agent) Start(ctx context.Context) error {
func (a *Agent) Start() error {
var err error
a.started.Do(func() {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
a.cancel = fn.Some(cancel)

err = a.start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion autopilot/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func setup(t *testing.T, initialChans []LocalChannel) *testContext {

// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(context.Background()); err != nil {
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}

Expand Down
17 changes: 4 additions & 13 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn/v2"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -55,9 +54,8 @@ type Manager struct {
// disabled.
pilot *Agent

quit chan struct{}
wg sync.WaitGroup
cancel fn.Option[context.CancelFunc]
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
}

Expand All @@ -83,7 +81,6 @@ func (m *Manager) Stop() error {
log.Errorf("Unable to stop pilot: %v", err)
}

m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(m.quit)
m.wg.Wait()
})
Expand All @@ -100,16 +97,14 @@ func (m *Manager) IsActive() bool {

// StartAgent creates and starts an autopilot agent from the Manager's
// config.
func (m *Manager) StartAgent(ctx context.Context) error {
func (m *Manager) StartAgent() error {
m.Lock()
defer m.Unlock()

// Already active.
if m.pilot != nil {
return nil
}
ctx, cancel := context.WithCancel(ctx)
m.cancel = fn.Some(cancel)

// Next, we'll fetch the current state of open channels from the
// database to use as initial state for the auto-pilot agent.
Expand All @@ -125,7 +120,7 @@ func (m *Manager) StartAgent(ctx context.Context) error {
return err
}

if err := pilot.Start(ctx); err != nil {
if err := pilot.Start(); err != nil {
return err
}

Expand Down Expand Up @@ -169,8 +164,6 @@ func (m *Manager) StartAgent(ctx context.Context) error {
return
case <-m.quit:
return
case <-ctx.Done():
return
}
}

Expand Down Expand Up @@ -241,8 +234,6 @@ func (m *Manager) StartAgent(ctx context.Context) error {
return
case <-m.quit:
return
case <-ctx.Done():
return
}
}
}()
Expand Down
8 changes: 4 additions & 4 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,10 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(

// Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements.
func (d *AuthenticatedGossiper) Start(ctx context.Context) error {
func (d *AuthenticatedGossiper) Start() error {
var err error
d.started.Do(func() {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
d.cancel = fn.Some(cancel)

log.Info("Authenticated Gossiper starting")
Expand Down Expand Up @@ -674,11 +674,11 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error {
// Start the reliable sender. In case we had any pending messages ready
// to be sent when the gossiper was last shut down, we must continue on
// our quest to deliver them to their respective peers.
if err := d.reliableSender.Start(ctx); err != nil {
if err := d.reliableSender.Start(); err != nil {
return err
}

d.syncMgr.Start(ctx)
d.syncMgr.Start()

d.banman.start()

Expand Down
4 changes: 2 additions & 2 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) (
ScidCloser: newMockScidCloser(isChanPeer),
}, selfKeyDesc)

if err := gossiper.Start(context.Background()); err != nil {
if err := gossiper.Start(); err != nil {
return nil, fmt.Errorf("unable to start router: %w", err)
}

Expand Down Expand Up @@ -1692,7 +1692,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
KeyLocator: tCtx.gossiper.selfKeyLoc,
})
require.NoError(t, err, "unable to recreate gossiper")
if err := gossiper.Start(context.Background()); err != nil {
if err := gossiper.Start(); err != nil {
t.Fatalf("unable to start recreated gossiper: %v", err)
}
defer gossiper.Stop()
Expand Down
4 changes: 2 additions & 2 deletions discovery/reliable_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
}

// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start(ctx context.Context) error {
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
s.cancel = fn.Some(cancel)

err = s.resendPendingMsgs(ctx)
Expand Down
21 changes: 6 additions & 15 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
Expand Down Expand Up @@ -201,9 +200,8 @@ type SyncManager struct {
// number of queries.
rateLimiter *rate.Limiter

wg sync.WaitGroup
quit chan struct{}
cancel fn.Option[context.CancelFunc]
wg sync.WaitGroup
quit chan struct{}
}

// newSyncManager constructs a new SyncManager backed by the given config.
Expand Down Expand Up @@ -248,13 +246,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
}

// Start starts the SyncManager in order to properly carry out its duties.
func (m *SyncManager) Start(ctx context.Context) {
func (m *SyncManager) Start() {
m.start.Do(func() {
ctx, cancel := context.WithCancel(ctx)
m.cancel = fn.Some(cancel)

m.wg.Add(1)
go m.syncerHandler(ctx)
go m.syncerHandler()
})
}

Expand All @@ -264,7 +259,6 @@ func (m *SyncManager) Stop() {
log.Debugf("SyncManager is stopping")
defer log.Debugf("SyncManager stopped")

m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(m.quit)
m.wg.Wait()

Expand All @@ -288,7 +282,7 @@ func (m *SyncManager) Stop() {
// much of the public network as possible.
//
// NOTE: This must be run as a goroutine.
func (m *SyncManager) syncerHandler(ctx context.Context) {
func (m *SyncManager) syncerHandler() {
defer m.wg.Done()

m.cfg.RotateTicker.Resume()
Expand Down Expand Up @@ -386,7 +380,7 @@ func (m *SyncManager) syncerHandler(ctx context.Context) {
}
m.syncersMu.Unlock()

s.Start(ctx)
s.Start()

// Once we create the GossipSyncer, we'll signal to the
// caller that they can proceed since the SyncManager's
Expand Down Expand Up @@ -538,9 +532,6 @@ func (m *SyncManager) syncerHandler(ctx context.Context) {

case <-m.quit:
return

case <-ctx.Done():
return
}
}
}
Expand Down
19 changes: 9 additions & 10 deletions discovery/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package discovery

import (
"bytes"
"context"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -83,7 +82,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
}

syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// First we'll start by adding the pinned syncers. These should
Expand Down Expand Up @@ -135,7 +134,7 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {

// We'll create our test sync manager to have two active syncers.
syncMgr := newTestSyncManager(2)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// The first will be an active syncer that performs a historical sync
Expand Down Expand Up @@ -188,7 +187,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {

// We'll create our sync manager with three active syncers.
syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// The first syncer registered always performs a historical sync.
Expand Down Expand Up @@ -236,7 +235,7 @@ func TestSyncManagerNoInitialHistoricalSync(t *testing.T) {
t.Parallel()

syncMgr := newTestSyncManager(0)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We should not expect any messages from the peer.
Expand Down Expand Up @@ -270,7 +269,7 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
t.Fatal("expected graph to not be considered as synced")
}

syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We should expect to see a QueryChannelRange message with a
Expand Down Expand Up @@ -339,7 +338,7 @@ func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) {
t.Parallel()

syncMgr := newTestSyncManager(2)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We should expect to see a QueryChannelRange message with a
Expand Down Expand Up @@ -373,7 +372,7 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) {
t.Parallel()

syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We should expect to see a QueryChannelRange message with a
Expand Down Expand Up @@ -411,7 +410,7 @@ func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) {
t.Parallel()

syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We should expect to see a QueryChannelRange message with a
Expand Down Expand Up @@ -469,7 +468,7 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
// We'll start by creating our test sync manager which will hold up to
// 2 active syncers.
syncMgr := newTestSyncManager(numActiveSyncers)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()

// We'll go ahead and create our syncers.
Expand Down
4 changes: 2 additions & 2 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {

// Start starts the GossipSyncer and any goroutines that it needs to carry out
// its duties.
func (g *GossipSyncer) Start(ctx context.Context) {
func (g *GossipSyncer) Start() {
g.started.Do(func() {
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])

ctx, _ := g.cg.Create(ctx)
ctx, _ := g.cg.Create(context.Background())

// TODO(conner): only spawn channelGraphSyncer if remote
// supports gossip queries, and only spawn replyHandler if we
Expand Down
Loading
Loading