Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ when running LND with an aux component injected (custom channels).
the payment. This prevents payments from entering a path finding loop that
would eventually timeout.

* [Only send a `Ping`](https://github.com/lightningnetwork/lnd/pull/9805) if we
haven't received any messages from the peer for one minute.

## RPC Additions

* [Add a new rpc endpoint](https://github.com/lightningnetwork/lnd/pull/8843)
Expand Down
87 changes: 45 additions & 42 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ const (
// This MUST be a smaller value than the pingInterval.
pingTimeout = 30 * time.Second

// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute

// writeMessageTimeout is the timeout used when writing a message to the
// peer.
// connection buffer.
writeMessageTimeout = 5 * time.Second

// writeFlushTimeout is the time we wait until a message is flushed.
// Once the message is written to the connection, we will retry flushing
// it until this timeout is hit.
writeFlushTimeout = 5 * time.Minute

// readMessageTimeout is the timeout used when reading a message from a
// peer.
readMessageTimeout = 5 * time.Second
Expand Down Expand Up @@ -1990,14 +1992,6 @@ func newDiscMsgStream(p *Brontide) *msgStream {
func (p *Brontide) readHandler() {
defer p.cg.WgDone()

// We'll stop the timer after a new messages is received, and also
// reset it after we process the next message.
idleTimer := time.AfterFunc(idleTimeout, func() {
err := fmt.Errorf("peer %s no answer for %s -- disconnecting",
p, idleTimeout)
p.Disconnect(err)
})

// Initialize our negotiated gossip sync method before reading messages
// off the wire. When using gossip queries, this ensures a gossip
// syncer is active by the time query messages arrive.
Expand All @@ -2009,15 +2003,10 @@ func (p *Brontide) readHandler() {
discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()

out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage()
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
if err != nil {
p.log.Infof("unable to read message from peer: %v", err)

Expand All @@ -2032,7 +2021,6 @@ out:
// compatible manner.
case *lnwire.UnknownMessage:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue

// If they sent us an address type that we don't yet
Expand All @@ -2041,7 +2029,6 @@ out:
// messages.
case *lnwire.ErrUnknownAddrType:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue

// If the NodeAnnouncement has an invalid alias, then
Expand All @@ -2050,17 +2037,24 @@ out:
// store this error because it is of little debugging
// value.
case *lnwire.ErrInvalidNodeAlias:
idleTimer.Reset(idleTimeout)
continue

// If the error we encountered wasn't just a message we
// didn't recognize, then we'll stop all processing as
// this is a fatal error.
default:
p.log.Errorf("read next msg err: %v", err)

break out
}
}

// Reset the ticker to delay sending the Ping. As long as we are
// receiving a message here, we know for sure the connection is
// alive, thus we can delay firing pings to check for the
// liveness.
p.pingManager.ResetPingTicker()

// If a message router is active, then we'll try to have it
// handle this message. If it can, then we're able to skip the
// rest of the message handling logic.
Expand Down Expand Up @@ -2197,13 +2191,9 @@ out:
// into the channel's in-order message stream.
p.sendLinkUpdateMsg(targetChan, nextMsg)
}

idleTimer.Reset(idleTimeout)
}

p.Disconnect(errors.New("read handler closed"))

p.log.Trace("readHandler for peer done")
}

// handleCustomMessage handles the given custom message if a handler is
Expand Down Expand Up @@ -2623,20 +2613,19 @@ func (p *Brontide) writeMessage(msg lnwire.Message) error {
//
// NOTE: This method MUST be run as a goroutine.
func (p *Brontide) writeHandler() {
// We'll stop the timer after a new messages is sent, and also reset it
// after we process the next message.
idleTimer := time.AfterFunc(idleTimeout, func() {
err := fmt.Errorf("peer %s no write for %s -- disconnecting",
p, idleTimeout)
p.Disconnect(err)
})

var exitErr error

// Create a timer to time out the write. We create it here to avoid
// allocations in the loop below.
flushTimer := time.NewTimer(writeFlushTimeout)

out:
for {
select {
case outMsg := <-p.sendQueue:
// Reset the timeout.
flushTimer.Reset(writeFlushTimeout)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are using go1.23, there's no need to call Stop anymore, reference.


// Record the time at which we first attempt to send the
// message.
startTime := time.Now()
Expand All @@ -2648,7 +2637,7 @@ out:
// slow to process messages from the wire.
err := p.writeMessage(outMsg.msg)
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
p.log.Debugf("Write timeout detected for "+
p.log.Warnf("Write timeout detected for "+
"peer, first write for message "+
"attempted %v ago",
time.Since(startTime))
Expand All @@ -2662,18 +2651,32 @@ out:
// reserializing or reencrypting it.
outMsg.msg = nil

goto retry
}

// The write succeeded, reset the idle timer to prevent
// us from disconnecting the peer.
if !idleTimer.Stop() {
// Check whether the flushing has already timed
// out. If so, we will exit and disconnect the
// peer with a timeout error.
select {
case <-idleTimer.C:
case <-flushTimer.C:
exitErr = errors.New("write timeout")

break out

default:
}

goto retry
}
idleTimer.Reset(idleTimeout)

// Reset the ticker to delay sending the Ping. Since
// we've successfully wriiten a msg to the connection,
// we know for sure the connection is alive, thus we can
// delay firing pings to check for the liveness.
//
// NOTE: This means if the connection is idle, the
// interval of pings is not strictly one minute as we
// will reset it here after a successful write. This
// offset should be negligible if the connection is
// healthy.
p.pingManager.ResetPingTicker()

// If the peer requested a synchronous write, respond
// with the error.
Expand Down
5 changes: 5 additions & 0 deletions peer/ping_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (m *PingManager) Start() error {
return err
}

// ResetPingTicker resets the internal `pingTicker`.
func (m *PingManager) ResetPingTicker() {
m.pingTicker.Reset(m.cfg.IntervalDuration)
}

// pingHandler is the main goroutine responsible for enforcing the ping/pong
// protocol.
func (m *PingManager) pingHandler() {
Expand Down
Loading