diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 3b66186925f..d51c7241e36 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -207,6 +207,9 @@ when running LND with an aux component injected (custom channels). the payment. This prevents payments from entering a path finding loop that would eventually timeout. +* [Only send a `Ping`](https://github.com/lightningnetwork/lnd/pull/9805) if we + haven't received any messages from the peer for one minute. + ## RPC Additions * [Add a new rpc endpoint](https://github.com/lightningnetwork/lnd/pull/8843) diff --git a/peer/brontide.go b/peer/brontide.go index bfc603ae8af..0ef9a936265 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -65,13 +65,15 @@ const ( // This MUST be a smaller value than the pingInterval. pingTimeout = 30 * time.Second - // idleTimeout is the duration of inactivity before we time out a peer. - idleTimeout = 5 * time.Minute - // writeMessageTimeout is the timeout used when writing a message to the - // peer. + // connection buffer. writeMessageTimeout = 5 * time.Second + // writeFlushTimeout is the time we wait until a message is flushed. + // Once the message is written to the connection, we will retry flushing + // it until this timeout is hit. + writeFlushTimeout = 5 * time.Minute + // readMessageTimeout is the timeout used when reading a message from a // peer. readMessageTimeout = 5 * time.Second @@ -1990,14 +1992,6 @@ func newDiscMsgStream(p *Brontide) *msgStream { func (p *Brontide) readHandler() { defer p.cg.WgDone() - // We'll stop the timer after a new messages is received, and also - // reset it after we process the next message. - idleTimer := time.AfterFunc(idleTimeout, func() { - err := fmt.Errorf("peer %s no answer for %s -- disconnecting", - p, idleTimeout) - p.Disconnect(err) - }) - // Initialize our negotiated gossip sync method before reading messages // off the wire. When using gossip queries, this ensures a gossip // syncer is active by the time query messages arrive. @@ -2009,15 +2003,10 @@ func (p *Brontide) readHandler() { discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() + out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() - if !idleTimer.Stop() { - select { - case <-idleTimer.C: - default: - } - } if err != nil { p.log.Infof("unable to read message from peer: %v", err) @@ -2032,7 +2021,6 @@ out: // compatible manner. case *lnwire.UnknownMessage: p.storeError(e) - idleTimer.Reset(idleTimeout) continue // If they sent us an address type that we don't yet @@ -2041,7 +2029,6 @@ out: // messages. case *lnwire.ErrUnknownAddrType: p.storeError(e) - idleTimer.Reset(idleTimeout) continue // If the NodeAnnouncement has an invalid alias, then @@ -2050,17 +2037,24 @@ out: // store this error because it is of little debugging // value. case *lnwire.ErrInvalidNodeAlias: - idleTimer.Reset(idleTimeout) continue // If the error we encountered wasn't just a message we // didn't recognize, then we'll stop all processing as // this is a fatal error. default: + p.log.Errorf("read next msg err: %v", err) + break out } } + // Reset the ticker to delay sending the Ping. As long as we are + // receiving a message here, we know for sure the connection is + // alive, thus we can delay firing pings to check for the + // liveness. + p.pingManager.ResetPingTicker() + // If a message router is active, then we'll try to have it // handle this message. If it can, then we're able to skip the // rest of the message handling logic. @@ -2197,13 +2191,9 @@ out: // into the channel's in-order message stream. p.sendLinkUpdateMsg(targetChan, nextMsg) } - - idleTimer.Reset(idleTimeout) } p.Disconnect(errors.New("read handler closed")) - - p.log.Trace("readHandler for peer done") } // handleCustomMessage handles the given custom message if a handler is @@ -2623,20 +2613,19 @@ func (p *Brontide) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *Brontide) writeHandler() { - // We'll stop the timer after a new messages is sent, and also reset it - // after we process the next message. - idleTimer := time.AfterFunc(idleTimeout, func() { - err := fmt.Errorf("peer %s no write for %s -- disconnecting", - p, idleTimeout) - p.Disconnect(err) - }) - var exitErr error + // Create a timer to time out the write. We create it here to avoid + // allocations in the loop below. + flushTimer := time.NewTimer(writeFlushTimeout) + out: for { select { case outMsg := <-p.sendQueue: + // Reset the timeout. + flushTimer.Reset(writeFlushTimeout) + // Record the time at which we first attempt to send the // message. startTime := time.Now() @@ -2648,7 +2637,7 @@ out: // slow to process messages from the wire. err := p.writeMessage(outMsg.msg) if nerr, ok := err.(net.Error); ok && nerr.Timeout() { - p.log.Debugf("Write timeout detected for "+ + p.log.Warnf("Write timeout detected for "+ "peer, first write for message "+ "attempted %v ago", time.Since(startTime)) @@ -2662,18 +2651,32 @@ out: // reserializing or reencrypting it. outMsg.msg = nil - goto retry - } - - // The write succeeded, reset the idle timer to prevent - // us from disconnecting the peer. - if !idleTimer.Stop() { + // Check whether the flushing has already timed + // out. If so, we will exit and disconnect the + // peer with a timeout error. select { - case <-idleTimer.C: + case <-flushTimer.C: + exitErr = errors.New("write timeout") + + break out + default: } + + goto retry } - idleTimer.Reset(idleTimeout) + + // Reset the ticker to delay sending the Ping. Since + // we've successfully wriiten a msg to the connection, + // we know for sure the connection is alive, thus we can + // delay firing pings to check for the liveness. + // + // NOTE: This means if the connection is idle, the + // interval of pings is not strictly one minute as we + // will reset it here after a successful write. This + // offset should be negligible if the connection is + // healthy. + p.pingManager.ResetPingTicker() // If the peer requested a synchronous write, respond // with the error. diff --git a/peer/ping_manager.go b/peer/ping_manager.go index f5c6180be13..5ec5d179f3d 100644 --- a/peer/ping_manager.go +++ b/peer/ping_manager.go @@ -108,6 +108,11 @@ func (m *PingManager) Start() error { return err } +// ResetPingTicker resets the internal `pingTicker`. +func (m *PingManager) ResetPingTicker() { + m.pingTicker.Reset(m.cfg.IntervalDuration) +} + // pingHandler is the main goroutine responsible for enforcing the ping/pong // protocol. func (m *PingManager) pingHandler() {