Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ const (

defaultPrunedNodeMaxPeers = 4
defaultNeutrinoMaxPeers = 8

// defaultNoDisconnectOnPongFailure is the default value for whether we
// should *not* disconnect from a peer if we don't receive a pong
// response in time after we send a ping.
defaultNoDisconnectOnPongFailure = false
)

var (
Expand Down Expand Up @@ -527,6 +532,10 @@ type Config struct {
// NumRestrictedSlots is the number of restricted slots we'll allocate
// in the server.
NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."`

// NoDisconnectOnPongFailure controls if we'll disconnect if a peer
// doesn't respond to a pong in time.
NoDisconnectOnPongFailure bool `long:"no-disconnect-on-pong-failure" description:"If true, a peer will *not* be disconnected if a pong is not received in time or is mismatched. Defaults to false, meaning peers *will* be disconnected on pong failure."`
}

// GRPCConfig holds the configuration options for the gRPC server.
Expand Down Expand Up @@ -747,10 +756,11 @@ func DefaultConfig() Config {
ServerPingTimeout: defaultGrpcServerPingTimeout,
ClientPingMinWait: defaultGrpcClientPingMinWait,
},
LogConfig: build.DefaultLogConfig(),
WtClient: lncfg.DefaultWtClientCfg(),
HTTPHeaderTimeout: DefaultHTTPHeaderTimeout,
NumRestrictedSlots: DefaultNumRestrictedSlots,
LogConfig: build.DefaultLogConfig(),
WtClient: lncfg.DefaultWtClientCfg(),
HTTPHeaderTimeout: DefaultHTTPHeaderTimeout,
NumRestrictedSlots: DefaultNumRestrictedSlots,
NoDisconnectOnPongFailure: defaultNoDisconnectOnPongFailure,
}
}

Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ when running LND with an aux component injected (custom channels).

* [remove x/exp/maps dependency](https://github.com/lightningnetwork/lnd/pull/9621)

* [Add a new configuration option](https://github.com/lightningnetwork/lnd/pull/9801)
`--no-disconnect-on-pong-failure` (defaulting to false) to control whether a
peer is disconnected if a pong message is not received in time or is mismatched.

## RPC Updates

* Some RPCs that previously just returned an empty response message now at least
Expand Down
32 changes: 26 additions & 6 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
torTimeoutMultiplier = 3

// msgStreamSize is the size of the message streams.
msgStreamSize = 5
msgStreamSize = 50
)

var (
Expand Down Expand Up @@ -455,6 +455,10 @@ type Config struct {
// experimental endorsement signals should be set.
ShouldFwdExpEndorsement func() bool

// NoDisconnectOnPongFailure indicates whether the peer should *not* be
// disconnected if a pong is not received in time or is mismatched.
NoDisconnectOnPongFailure bool

// Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{}
}
Expand Down Expand Up @@ -735,11 +739,27 @@ func NewBrontide(cfg Config) *Brontide {
SendPing: func(ping *lnwire.Ping) {
p.queueMsg(ping, nil)
},
OnPongFailure: func(err error) {
eStr := "pong response failure for %s: %v " +
"-- disconnecting"
p.log.Warnf(eStr, p, err)
go p.Disconnect(fmt.Errorf(eStr, p, err))
OnPongFailure: func(reason error,
timeWaitedForPong time.Duration,
lastKnownRTT time.Duration) {

logMsg := fmt.Sprintf("pong response "+
"failure for %s: %v. Time waited for this "+
"pong: %v. Last successful RTT: %v.",
p, reason, timeWaitedForPong, lastKnownRTT)

// If NoDisconnectOnPongFailure is true, we don't
// disconnect. Otherwise (if it's false, the default),
// we disconnect.
Comment on lines +751 to +753
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Comment can be removed, describes the code.

if p.cfg.NoDisconnectOnPongFailure {
p.log.Warnf("%s -- not disconnecting "+
"due to config", logMsg)
return
}

p.log.Warnf("%s -- disconnecting", logMsg)

go p.Disconnect(fmt.Errorf("pong failure: %w", reason))
},
})

Expand Down
109 changes: 79 additions & 30 deletions peer/ping_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnwire"
)

Expand Down Expand Up @@ -36,7 +37,8 @@ type PingManagerConfig struct {
// OnPongFailure is a closure that is responsible for executing the
// logic when a Pong message is either late or does not match our
// expectations for that Pong
OnPongFailure func(error)
OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
lastKnownRTT time.Duration)
}

// PingManager is a structure that is designed to manage the internal state
Expand Down Expand Up @@ -108,6 +110,26 @@ func (m *PingManager) Start() error {
return err
}

// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
func (m *PingManager) getLastRTT() time.Duration {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use fn.Option here as well similar as below pendingPingWait ?

rttPtr := m.pingTime.Load()
if rttPtr == nil {
return 0
}

return *rttPtr
}

// pendingPingWait calculates the time waited since the last ping was sent. If
// no ping time is reported, None is returned. defaultDuration.
func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
if m.pingLastSend != nil {
return fn.Some(time.Since(*m.pingLastSend))
}

return fn.None[time.Duration]()
}

// pingHandler is the main goroutine responsible for enforcing the ping/pong
// protocol.
func (m *PingManager) pingHandler() {
Expand All @@ -119,6 +141,10 @@ func (m *PingManager) pingHandler() {
<-m.pingTimeout.C
}

// Because we don't know if the OnPingFailure callback actually
// disconnects a peer (dependent on user config), we should never return
// from this loop unless the ping manager is stopped explicitly (which
// happens on disconnect).
for {
select {
case <-m.pingTicker.C:
Expand All @@ -127,12 +153,20 @@ func (m *PingManager) pingHandler() {
// awaiting a pong response. This should never occur,
// but if it does, it implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
// Ping was outstanding, meaning it timed out by
// the arrival of the next ping interval.
timeWaited := m.pendingPingWait().UnwrapOr(
m.cfg.IntervalDuration,
)
lastRTT := m.getLastRTT()

m.cfg.OnPongFailure(
errors.New("ping timed "+
"out by next interval"),
timeWaited, lastRTT,
)
m.cfg.OnPongFailure(e)

return
m.resetPingState()
}

pongSize := m.cfg.NewPongSize()
Expand All @@ -143,53 +177,67 @@ func (m *PingManager) pingHandler() {

// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
m.cfg.OnPongFailure(err)
// This is an internal error related to timer
// reset. Pass it to OnPongFailure as it's
// critical. Current and last RTT are not
// directly applicable here.
m.cfg.OnPongFailure(err, 0, 0)

m.resetPingState()

return
continue
}

m.cfg.SendPing(ping)

case <-m.pingTimeout.C:
m.resetPingState()

e := errors.New("timeout while waiting for " +
"pong response",
timeWaited := m.pendingPingWait().UnwrapOr(
m.cfg.TimeoutDuration,
)
lastRTT := m.getLastRTT()

m.cfg.OnPongFailure(e)
m.cfg.OnPongFailure(
errors.New("timeout while waiting for "+
"pong response"),
timeWaited, lastRTT,
)

return
m.resetPingState()

case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))

// Save off values we are about to override when we
// call resetPingState.
// Save off values we are about to override when we call
// resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
lastPingTime := m.pingLastSend

m.resetPingState()
// This is an unexpected pong, we'll continue.
if lastPingTime == nil {
continue
}

// If the pong we receive doesn't match the ping we
// sent out, then we fail out.
actualRTT := time.Since(*lastPingTime)

// If the pong we receive doesn't match the ping we sent
// out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
e := fmt.Errorf("pong response does not match "+
"expected size. Expected: %d, Got: %d",
expected, pongSize)

m.cfg.OnPongFailure(e)
lastRTT := m.getLastRTT()
m.cfg.OnPongFailure(e, actualRTT, lastRTT)

return
}
m.resetPingState()

// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
continue
}

// Pong is good, update RTT and reset state.
m.pingTime.Store(&actualRTT)
m.resetPingState()

case <-m.quit:
return
}
Expand Down Expand Up @@ -231,6 +279,7 @@ func (m *PingManager) setPingState(pongSize uint16) error {
func (m *PingManager) resetPingState() {
m.pingLastSend = nil
m.outstandingPongSize = -1

if !m.pingTimeout.Stop() {
select {
case <-m.pingTimeout.C:
Expand Down
Loading
Loading