-
Notifications
You must be signed in to change notification settings - Fork 21
Not returning on connection close for chainsync, block-fetch and tx-submission protocol #1141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
6b39f59
c42b242
829e00e
b255d66
3afdaa0
4de8be1
a43fdd1
762277d
b01fcbe
3e78cb5
29a9bd1
728d9b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,4 @@ | ||||||||||
| // Copyright 2023 Blink Labs Software | ||||||||||
| // Copyright 2025 Blink Labs Software | ||||||||||
| // | ||||||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||
| // you may not use this file except in compliance with the License. | ||||||||||
|
|
@@ -20,65 +20,231 @@ import ( | |||||||||
| "time" | ||||||||||
|
|
||||||||||
| ouroboros "github.com/blinklabs-io/gouroboros" | ||||||||||
| "github.com/blinklabs-io/ouroboros-mock" | ||||||||||
| "github.com/blinklabs-io/gouroboros/protocol/chainsync" | ||||||||||
| "github.com/blinklabs-io/gouroboros/protocol/common" | ||||||||||
| ouroboros_mock "github.com/blinklabs-io/ouroboros-mock" | ||||||||||
| "go.uber.org/goleak" | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| // Ensure that we don't panic when closing the Connection object after a failed Dial() call | ||||||||||
| func TestDialFailClose(t *testing.T) { | ||||||||||
| // TestErrorHandlingWithActiveProtocols tests that connection errors are propagated | ||||||||||
| // when protocols are active, and ignored when protocols are stopped | ||||||||||
| func TestErrorHandlingWithActiveProtocols(t *testing.T) { | ||||||||||
| defer goleak.VerifyNone(t) | ||||||||||
| oConn, err := ouroboros.New() | ||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
| err = oConn.Dial("unix", "/path/does/not/exist") | ||||||||||
| if err == nil { | ||||||||||
| t.Fatalf("did not get expected failure on Dial()") | ||||||||||
| } | ||||||||||
| // Close connection | ||||||||||
| oConn.Close() | ||||||||||
|
|
||||||||||
| t.Run("ErrorsPropagatedWhenProtocolsActive", func(t *testing.T) { | ||||||||||
| // Create a mock connection that will complete handshake and start the chainsync protocol | ||||||||||
| mockConn := ouroboros_mock.NewConnection( | ||||||||||
| ouroboros_mock.ProtocolRoleClient, | ||||||||||
| []ouroboros_mock.ConversationEntry{ | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeNtCResponse, | ||||||||||
| // ChainSync messages | ||||||||||
| ouroboros_mock.ConversationEntryInput{ | ||||||||||
| // FindIntersect | ||||||||||
| ProtocolId: chainsync.ProtocolIdNtC, | ||||||||||
| Message: chainsync.NewMsgFindIntersect( | ||||||||||
| []common.Point{ | ||||||||||
| { | ||||||||||
| Slot: 21600, | ||||||||||
| Hash: []byte("19297addad3da631einos029"), | ||||||||||
| }, | ||||||||||
| }, | ||||||||||
| ), | ||||||||||
| }, | ||||||||||
| }, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| oConn, err := ouroboros.New( | ||||||||||
| ouroboros.WithConnection(mockConn), | ||||||||||
| ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||||||||||
| ouroboros.WithServer(true), | ||||||||||
| ouroboros.WithChainSyncConfig( | ||||||||||
| chainsync.NewConfig( | ||||||||||
| chainsync.WithFindIntersectFunc( | ||||||||||
| func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) { | ||||||||||
| // We need to block here to keep the protocol active | ||||||||||
| time.Sleep(5 * time.Second) | ||||||||||
| return common.Point{}, chainsync.Tip{}, fmt.Errorf("context cancelled") | ||||||||||
| }, | ||||||||||
| ), | ||||||||||
| ), | ||||||||||
|
Comment on lines
88
to
95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove 5s sleep; block on protocol context and signal readiness to avoid leaks/flakes. time.Sleep(5s) keeps goroutines alive and causes goleak failures. Use a barrier to know CS is active, then block on context cancellation so shutdown is prompt. Apply: - chainsync.WithFindIntersectFunc(
- func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) {
- // We need to block here to keep the protocol active
- time.Sleep(5 * time.Second)
- return common.Point{}, chainsync.Tip{}, fmt.Errorf("context cancelled")
- },
- ),
+ chainsync.WithFindIntersectFunc(
+ func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) {
+ // Signal test that ChainSync is active
+ if entered := ctx.Value("enteredFindIntersect"); entered != nil {
+ if ch, ok := entered.(chan struct{}); ok {
+ select { case <-ch: default: close(ch) }
+ }
+ }
+ // Wait for shutdown instead of sleeping
+ type hasContext interface{ Context() interface{ Done() <-chan struct{}; Err() error } }
+ if hc, ok := any(ctx).(hasContext); ok {
+ <-hc.Context().Done()
+ return common.Point{}, chainsync.Tip{}, hc.Context().Err()
+ }
+ // Fallback: short wait to keep protocol "active" without leaking
+ <-time.After(250 * time.Millisecond)
+ return common.Point{}, chainsync.Tip{}, fmt.Errorf("closed")
+ },
+ ),And create/await the barrier near the test: - // Wait a bit for protocol to start
- time.Sleep(100 * time.Millisecond)
+ // Barrier to know callback ran
+ enteredFindIntersect := make(chan struct{})
+ oConn.SetValue("enteredFindIntersect", enteredFindIntersect) // helper on Connection to pass into ctx; or thread via cfg if available
+ select {
+ case <-enteredFindIntersect:
+ case <-time.After(1 * time.Second):
+ t.Fatal("timeout waiting for FindIntersect to start")
+ }If SetValue isn’t available, thread the channel via your ChainSync config’s context or a package-level var for the test only. Based on learnings. Also applies to: 118-123 🤖 Prompt for AI Agents |
||||||||||
| ), | ||||||||||
| ) | ||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Wait for handshake to complete by checking if protocols are initialized | ||||||||||
| var chainSyncProtocol *chainsync.ChainSync | ||||||||||
| for i := 0; i < 100; i++ { | ||||||||||
| chainSyncProtocol = oConn.ChainSync() | ||||||||||
| if chainSyncProtocol != nil && chainSyncProtocol.Server != nil { | ||||||||||
| break | ||||||||||
| } | ||||||||||
| time.Sleep(10 * time.Millisecond) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if chainSyncProtocol == nil || chainSyncProtocol.Server == nil { | ||||||||||
| oConn.Close() | ||||||||||
| t.Fatal("chain sync protocol not initialized") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Wait a bit for protocol to start | ||||||||||
| time.Sleep(100 * time.Millisecond) | ||||||||||
|
|
||||||||||
| // Close the mock connection to generate a connection error | ||||||||||
| mockConn.Close() | ||||||||||
|
|
||||||||||
| // We should receive a connection error since protocols are active | ||||||||||
| select { | ||||||||||
| case err := <-oConn.ErrorChan(): | ||||||||||
| if err == nil { | ||||||||||
| t.Fatal("expected connection error, got nil") | ||||||||||
| } | ||||||||||
| t.Logf("Received connection error (expected with active protocols): %s", err) | ||||||||||
| case <-time.After(2 * time.Second): | ||||||||||
| t.Error("timed out waiting for connection error - error should be propagated when protocols are active") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| oConn.Close() | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| t.Run("ErrorsIgnoredWhenProtocolsStopped", func(t *testing.T) { | ||||||||||
| // Create a mock connection that will send a Done message to stop the protocol | ||||||||||
| mockConn := ouroboros_mock.NewConnection( | ||||||||||
| ouroboros_mock.ProtocolRoleClient, | ||||||||||
| []ouroboros_mock.ConversationEntry{ | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeNtCResponse, | ||||||||||
| // Send Done message to stop the protocol | ||||||||||
| ouroboros_mock.ConversationEntryInput{ | ||||||||||
| ProtocolId: chainsync.ProtocolIdNtC, | ||||||||||
| }, | ||||||||||
| }, | ||||||||||
| ) | ||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
|
||||||||||
| oConn, err := ouroboros.New( | ||||||||||
| ouroboros.WithConnection(mockConn), | ||||||||||
| ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||||||||||
| ouroboros.WithServer(true), | ||||||||||
| ) | ||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Wait for handshake to complete | ||||||||||
| var chainSyncProtocol *chainsync.ChainSync | ||||||||||
| for i := 0; i < 100; i++ { | ||||||||||
| chainSyncProtocol = oConn.ChainSync() | ||||||||||
| if chainSyncProtocol != nil && chainSyncProtocol.Server != nil { | ||||||||||
| break | ||||||||||
| } | ||||||||||
| time.Sleep(10 * time.Millisecond) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if chainSyncProtocol == nil || chainSyncProtocol.Server == nil { | ||||||||||
| oConn.Close() | ||||||||||
| t.Fatal("chain sync protocol not initialized") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Wait for protocol to be done (Done message from mock should trigger this) | ||||||||||
| select { | ||||||||||
| case <-chainSyncProtocol.Server.DoneChan(): | ||||||||||
| // Protocol is stoppeds | ||||||||||
| case <-time.After(1 * time.Second): | ||||||||||
|
Comment on lines
+202
to
+203
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: fix typo in comment. “stoppeds” → “stopped”. - // Protocol is stoppeds
+ // Protocol is stopped📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
| t.Fatal("timed out waiting for protocol to stop") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Now close the mock connection to generate an error | ||||||||||
| mockConn.Close() | ||||||||||
| select { | ||||||||||
| case err := <-oConn.ErrorChan(): | ||||||||||
| t.Logf("Received error during shutdown: %s", err) | ||||||||||
| case <-time.After(500 * time.Millisecond): | ||||||||||
| t.Log("No connection error received (expected when protocols are stopped)") | ||||||||||
| } | ||||||||||
|
Comment on lines
+209
to
+214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Tighten assertions on ErrorChan to match PR behavior. Tests should fail on unexpected errors rather than log them, ensuring semantics don’t regress silently. See diffs above where logging was replaced with t.Fatalf on any received error in “stopped/no‑traffic” scenarios. Also applies to: 249-257 🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
| oConn.Close() | ||||||||||
| }) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| func TestDoubleClose(t *testing.T) { | ||||||||||
| // TestErrorHandlingWithMultipleProtocols tests error handling with multiple active protocols | ||||||||||
| func TestErrorHandlingWithMultipleProtocols(t *testing.T) { | ||||||||||
| defer goleak.VerifyNone(t) | ||||||||||
|
|
||||||||||
| mockConn := ouroboros_mock.NewConnection( | ||||||||||
| ouroboros_mock.ProtocolRoleClient, | ||||||||||
| []ouroboros_mock.ConversationEntry{ | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeNtCResponse, | ||||||||||
| ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||||||||||
| }, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| oConn, err := ouroboros.New( | ||||||||||
| ouroboros.WithConnection(mockConn), | ||||||||||
| ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||||||||||
| ouroboros.WithNodeToNode(true), | ||||||||||
| ) | ||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
| // Async error handler | ||||||||||
| go func() { | ||||||||||
| err, ok := <-oConn.ErrorChan() | ||||||||||
| if !ok { | ||||||||||
| return | ||||||||||
|
|
||||||||||
| // Wait for handshake to complete | ||||||||||
| time.Sleep(100 * time.Millisecond) | ||||||||||
|
|
||||||||||
| // Close connection to generate error | ||||||||||
| mockConn.Close() | ||||||||||
|
|
||||||||||
| // Should receive error since protocols are active | ||||||||||
| select { | ||||||||||
| case err := <-oConn.ErrorChan(): | ||||||||||
| if err == nil { | ||||||||||
| t.Fatal("expected connection error, got nil") | ||||||||||
| } | ||||||||||
| // We can't call t.Fatalf() from a different Goroutine, so we panic instead | ||||||||||
| panic(fmt.Sprintf("unexpected Ouroboros connection error: %s", err)) | ||||||||||
| }() | ||||||||||
| // Close connection | ||||||||||
| if err := oConn.Close(); err != nil { | ||||||||||
| t.Fatalf("unexpected error when closing Connection object: %s", err) | ||||||||||
| t.Logf("Received connection error with multiple active protocols: %s", err) | ||||||||||
| case <-time.After(2 * time.Second): | ||||||||||
| t.Error("timed out waiting for connection error") | ||||||||||
| } | ||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| // Close connection again | ||||||||||
| if err := oConn.Close(); err != nil { | ||||||||||
| t.Fatalf( | ||||||||||
| "unexpected error when closing Connection object again: %s", | ||||||||||
| err, | ||||||||||
|
|
||||||||||
| oConn.Close() | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // TestBasicErrorHandling tests basic error handling scenarios | ||||||||||
| func TestBasicErrorHandling(t *testing.T) { | ||||||||||
| defer goleak.VerifyNone(t) | ||||||||||
|
|
||||||||||
| t.Run("DialFailure", func(t *testing.T) { | ||||||||||
| oConn, err := ouroboros.New( | ||||||||||
| ouroboros.WithNetworkMagic(764824073), | ||||||||||
| ) | ||||||||||
| } | ||||||||||
| // Wait for connection shutdown | ||||||||||
| select { | ||||||||||
| case <-oConn.ErrorChan(): | ||||||||||
| case <-time.After(10 * time.Second): | ||||||||||
| t.Errorf("did not shutdown within timeout") | ||||||||||
| } | ||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| err = oConn.Dial("tcp", "invalid-hostname:9999") | ||||||||||
| if err == nil { | ||||||||||
| t.Fatal("expected dial error, got nil") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| oConn.Close() | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| t.Run("DoubleClose", func(t *testing.T) { | ||||||||||
| oConn, err := ouroboros.New( | ||||||||||
| ouroboros.WithNetworkMagic(764824073), | ||||||||||
| ) | ||||||||||
| if err != nil { | ||||||||||
| t.Fatalf("unexpected error when creating Connection object: %s", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // First close | ||||||||||
| if err := oConn.Close(); err != nil { | ||||||||||
| t.Fatalf("unexpected error on first close: %s", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Second close should also work | ||||||||||
| if err := oConn.Close(); err != nil { | ||||||||||
| t.Fatalf("unexpected error on second close: %s", err) | ||||||||||
| } | ||||||||||
| }) | ||||||||||
| } | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
EOF handling logic inverted for server/client; also consider ErrUnexpectedEOF
Proposed fix:
func (c *Connection) handleConnectionError(err error) error { if err == nil { return nil } - // Only propagate EOF errors when acting as a client with active server-side protocols - if errors.Is(err, io.EOF) { - // Check if we have any active server-side protocols - if c.server { - return err - } - - // For clients, only propagate EOF if we have active server protocols - hasActiveServerProtocols := false - if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - - if hasActiveServerProtocols { - return err - } - - // EOF with no active server protocols is normal connection closure - return nil - } + // Treat EOF/UnexpectedEOF as connection closed, decide based on active protocols for our role + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + hasActive := false + if c.server { + // Server: check server-side protocols + if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { + hasActive = true + } + if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { + hasActive = true + } + if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { + hasActive = true + } + if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { + hasActive = true + } + if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { + hasActive = true + } + if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { + hasActive = true + } + } else { + // Client: check client-side protocols + if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() { + hasActive = true + } + if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() { + hasActive = true + } + if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() { + hasActive = true + } + if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() { + hasActive = true + } + if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() { + hasActive = true + } + if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() { + hasActive = true + } + } + // Propagate only if a protocol is still active; otherwise swallow as normal closure + if hasActive { + return err + } + return nil + } // For non-EOF errors, always propagate return err }To confirm muxer behavior for UnexpectedEOF remapping, run:
Also applies to: 335-347
🏁 Script executed:
Length of output: 0
🏁 Script executed:
Length of output: 5839
🌐 Web query:
💡 Result:
io.ErrUnexpectedEOF
What it is: a package-level error variable defined as
var ErrUnexpectedEOF = errors.New("unexpected EOF")
meaning EOF was encountered in the middle of reading a fixed-size block or structured data. [1]
Where it’s declared: package io in the Go standard library. [1]
When it’s returned: commonly by helpers that require reading an exact number of bytes (e.g. io.ReadFull, io.ReadAtLeast) and by packages that read fixed-size binary structures (e.g. encoding/binary) when EOF occurs after some but not all bytes were read. If no bytes were read, functions typically return io.EOF instead. [2][3]
How to check it: since Go 1.13 errors may be wrapped, use errors.Is(err, io.ErrUnexpectedEOF) to detect it robustly. (Older code sometimes used err == io.ErrUnexpectedEOF.) [4]
Sources:
[1] io package docs (ErrUnexpectedEOF definition).
[2] io.ReadFull / ReadAtLeast behavior (returns ErrUnexpectedEOF when partial read then EOF).
[3] encoding/binary.Read behavior (returns io.ErrUnexpectedEOF if EOF after some bytes).
[4] Go error-handling FAQ (use errors.Is for comparisons).
🏁 Script executed:
Length of output: 100
🏁 Script executed:
Length of output: 3089
🏁 Script executed:
Length of output: 802
Fix EOF handling logic: server/client role checks are inverted
The current code has the logic backwards:
This means servers always propagate EOF (even when protocols are active), and clients check the wrong role's protocols. The proposed fix correctly swaps the logic: server checks
Serverfields, client checksClientfields. Additionally, handleio.ErrUnexpectedEOFalongsideio.EOF, as the protocol layer already uses it for partial reads.Apply the proposed diff and ensure the muxer behavior aligns with the corrected logic.
🤖 Prompt for AI Agents