Skip to content

Commit 350aa4f

Browse files
authored
Surface transient channel errors (#11)
This PR better surfaces transient errors happening deeper in the channel to provide more information to the user.
1 parent f11c322 commit 350aa4f

File tree

8 files changed

+105
-35
lines changed

8 files changed

+105
-35
lines changed

Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ package final class Connection: Sendable {
4949
/// The connect attempt succeeded and the connection is ready to use.
5050
case connectSucceeded
5151
/// The connect attempt failed.
52-
case connectFailed(any Error)
52+
case connectFailed(RPCError)
5353
/// The connection received a GOAWAY and will close soon. No new streams
5454
/// should be opened on this connection.
5555
case goingAway(HTTP2ErrorCode, String)
@@ -68,7 +68,7 @@ package final class Connection: Sendable {
6868
/// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
6969
case remote
7070
/// Closed because the connection encountered an unexpected error.
71-
case error(any Error, wasIdle: Bool)
71+
case error(RPCError, wasIdle: Bool)
7272
}
7373

7474
/// Inputs to the 'run' method.
@@ -127,9 +127,20 @@ package final class Connection: Sendable {
127127
/// This function returns when the connection has closed. You can observe connection events
128128
/// by consuming the ``events`` sequence.
129129
package func run() async {
130-
let connectResult = await Result {
131-
try await self.http2Connector.establishConnection(to: self.address)
130+
func establishConnectionOrThrow() async throws(RPCError) -> HTTP2Connection {
131+
do {
132+
return try await self.http2Connector.establishConnection(to: self.address)
133+
} catch let error as RPCError {
134+
throw error
135+
} catch {
136+
throw RPCError(
137+
code: .unavailable,
138+
message: "Could not establish a connection to \(self.address).",
139+
cause: error
140+
)
141+
}
132142
}
143+
let connectResult = await Result(catching: establishConnectionOrThrow)
133144

134145
switch connectResult {
135146
case .success(let connected):
@@ -236,6 +247,7 @@ package final class Connection: Sendable {
236247
// This state is tracked here so that if the connection events sequence finishes and the
237248
// connection never became ready then the connection can report that the connect failed.
238249
var isReady = false
250+
var unexpectedCloseError: (any Error)?
239251

240252
func makeNeverReadyError(cause: (any Error)?) -> RPCError {
241253
return RPCError(
@@ -265,10 +277,15 @@ package final class Connection: Sendable {
265277
// The connection will close at some point soon, yield a notification for this
266278
// because the close might not be imminent and this could result in address resolution.
267279
self.event.continuation.yield(.goingAway(errorCode, reason))
268-
case .idle, .keepaliveExpired, .initiatedLocally, .unexpected:
280+
case .idle, .keepaliveExpired, .initiatedLocally:
269281
// The connection will be closed imminently in these cases there's no need to do
270282
// anything.
271283
()
284+
case .unexpected(let error, _):
285+
// The connection will be closed imminently in this case.
286+
// We'll store the error that caused the unexpected closure so we
287+
// can surface it.
288+
unexpectedCloseError = error
272289
}
273290

274291
// Take the reason with the highest precedence. A GOAWAY may be superseded by user
@@ -318,7 +335,7 @@ package final class Connection: Sendable {
318335
finalEvent = .closed(connectionCloseReason)
319336
} else {
320337
// The connection never became ready, this therefore counts as a failed connect attempt.
321-
finalEvent = .connectFailed(makeNeverReadyError(cause: nil))
338+
finalEvent = .connectFailed(makeNeverReadyError(cause: unexpectedCloseError))
322339
}
323340

324341
// The connection events sequence has finished: the connection is now closed.

Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17+
package import GRPCCore
18+
1719
package enum ConnectivityState: Sendable, Hashable {
1820
/// This channel isn't trying to create a connection because of a lack of new or pending RPCs.
1921
///
@@ -34,7 +36,7 @@ package enum ConnectivityState: Sendable, Hashable {
3436
/// establish a connection again. Since retries are done with exponential backoff, channels that
3537
/// fail to connect will start out spending very little time in this state but as the attempts
3638
/// fail repeatedly, the channel will spend increasingly large amounts of time in this state.
37-
case transientFailure
39+
case transientFailure(cause: RPCError)
3840

3941
/// This channel has started shutting down. Any new RPCs should fail immediately. Pending RPCs
4042
/// may continue running until the application cancels them. Channels may enter this state either

Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ extension GRPCChannel {
290290
}
291291

292292
case .failRPC:
293-
return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
293+
return .stopTrying(RPCError(code: .unavailable, message: "Channel isn't ready."))
294294
}
295295
}
296296

@@ -300,7 +300,7 @@ extension GRPCChannel {
300300
loadBalancer: LoadBalancer
301301
) async -> MakeStreamResult {
302302
guard let subchannel = loadBalancer.pickSubchannel() else {
303-
return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
303+
return .tryAgain(RPCError(code: .unavailable, message: "Channel isn't ready."))
304304
}
305305

306306
let methodConfig = self.config(forMethod: descriptor)
@@ -758,14 +758,24 @@ extension GRPCChannel.StateMachine {
758758
result: .success(state.current)
759759
)
760760

761-
case .transientFailure, .shutdown: // shutdown includes shutting down
761+
case .transientFailure(let cause):
762762
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
763763
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
764764
// the entries in the queue will wait for a load-balancer to become ready.
765765
let continuations = state.queue.removeFastFailingEntries()
766766
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
767767
continuations: continuations,
768-
result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
768+
result: .failure(cause)
769+
)
770+
771+
case .shutdown: // shutdown includes shutting down
772+
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
773+
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
774+
// the entries in the queue will wait for a load-balancer to become ready.
775+
let continuations = state.queue.removeFastFailingEntries()
776+
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
777+
continuations: continuations,
778+
result: .failure(RPCError(code: .unavailable, message: "Channel isn't ready."))
769779
)
770780

771781
case .idle, .connecting:

Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ extension RoundRobinLoadBalancer {
530530
// The transition from transient failure to connecting is ignored.
531531
//
532532
// See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
533-
if self.state == .transientFailure, newState == .connecting {
533+
if case .transientFailure = self.state, newState == .connecting {
534534
return false
535535
}
536536

@@ -750,12 +750,26 @@ extension ConnectivityState {
750750
return .idle
751751
}
752752

753-
// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
754-
// is TRANSIENT_FAILURE.
755-
if states.allSatisfy({ $0 == .transientFailure }) {
756-
return .transientFailure
753+
// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state is TRANSIENT_FAILURE.
754+
var cause: RPCError?
755+
for state in states {
756+
switch state {
757+
case .transientFailure(let error):
758+
// Pick one of the errors to surface, as we can't surface all of them.
759+
cause = error
760+
case .shutdown:
761+
return .shutdown
762+
case .idle, .connecting, .ready:
763+
fatalError("Unreachable state: these should have been handled above.")
764+
}
757765
}
758766

759-
return .shutdown
767+
if let cause {
768+
return .transientFailure(cause: cause)
769+
} else {
770+
// We can only reach this point without a `cause` if `states` was empty.
771+
// Fall back to shutdown: we have nothing better to do.
772+
return .shutdown
773+
}
760774
}
761775
}

Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ extension Subchannel {
256256
switch event {
257257
case .connectSucceeded:
258258
self.handleConnectSucceededEvent()
259-
case .connectFailed:
260-
self.handleConnectFailedEvent(in: &group)
259+
case .connectFailed(let cause):
260+
self.handleConnectFailedEvent(in: &group, error: cause)
261261
case .goingAway:
262262
self.handleGoingAwayEvent()
263263
case .closed(let reason):
@@ -282,7 +282,7 @@ extension Subchannel {
282282
}
283283
}
284284

285-
private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
285+
private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup, error: RPCError) {
286286
let onConnectFailed = self.state.withLock { $0.connectFailed(connector: self.connector) }
287287
switch onConnectFailed {
288288
case .connect(let connection):
@@ -291,7 +291,11 @@ extension Subchannel {
291291

292292
case .backoff(let duration):
293293
// All addresses have been tried, backoff for some time.
294-
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
294+
self.event.continuation.yield(
295+
.connectivityStateChanged(
296+
.transientFailure(cause: error)
297+
)
298+
)
295299
group.addTask {
296300
do {
297301
try await Task.sleep(for: duration)
@@ -334,9 +338,9 @@ extension Subchannel {
334338
case .emitIdle:
335339
self.event.continuation.yield(.connectivityStateChanged(.idle))
336340

337-
case .emitTransientFailureAndReconnect:
341+
case .emitTransientFailureAndReconnect(let cause):
338342
// Unclean closes trigger a transient failure state change and a name resolution.
339-
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
343+
self.event.continuation.yield(.connectivityStateChanged(.transientFailure(cause: cause)))
340344
self.event.continuation.yield(.requiresNameResolution)
341345
// Attempt to reconnect.
342346
self.handleConnectInput(in: &group)
@@ -632,7 +636,7 @@ extension Subchannel {
632636
enum OnClosed {
633637
case nothing
634638
case emitIdle
635-
case emitTransientFailureAndReconnect
639+
case emitTransientFailureAndReconnect(cause: RPCError)
636640
case finish(emitShutdown: Bool)
637641
}
638642

@@ -646,9 +650,21 @@ extension Subchannel {
646650
self = .notConnected(NotConnected(from: state))
647651
onClosed = .emitIdle
648652

649-
case .keepaliveTimeout, .error(_, wasIdle: false):
653+
case .keepaliveTimeout:
654+
self = .notConnected(NotConnected(from: state))
655+
onClosed = .emitTransientFailureAndReconnect(
656+
cause: RPCError(
657+
code: .unavailable,
658+
message: """
659+
The connection became unresponsive and was closed because the \
660+
keepalive timeout fired.
661+
"""
662+
)
663+
)
664+
665+
case .error(let error, wasIdle: false):
650666
self = .notConnected(NotConnected(from: state))
651-
onClosed = .emitTransientFailureAndReconnect
667+
onClosed = .emitTransientFailureAndReconnect(cause: error)
652668

653669
case .initiatedLocally:
654670
// Should be in the 'shuttingDown' state.

Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
extension Result where Failure == any Error {
17+
extension Result {
1818
/// Like `Result(catching:)`, but `async`.
1919
///
2020
/// - Parameter body: An `async` closure to catch the result of.
2121
@inlinable
22-
init(catching body: () async throws -> Success) async {
22+
init(catching body: () async throws(Failure) -> Success) async {
2323
do {
2424
self = .success(try await body())
2525
} catch {

Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,7 @@ extension Connection.CloseReason {
5454
return true
5555

5656
case (.error(let lhsError, let lhsStreams), .error(let rhsError, let rhsStreams)):
57-
if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError {
58-
return lhs == rhs && lhsStreams == rhsStreams
59-
} else {
60-
return lhsStreams == rhsStreams
61-
}
57+
return lhsError == rhsError && lhsStreams == rhsStreams
6258

6359
default:
6460
return false

Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,15 @@ final class SubchannelTests: XCTestCase {
161161
[
162162
.connectivityStateChanged(.idle),
163163
.connectivityStateChanged(.connecting),
164-
.connectivityStateChanged(.transientFailure),
164+
.connectivityStateChanged(
165+
.transientFailure(
166+
cause: RPCError(
167+
code: .unavailable,
168+
message:
169+
"Could not establish a connection to [unix]test-connect-eventually-succeeds."
170+
)
171+
)
172+
),
165173
.connectivityStateChanged(.connecting),
166174
]
167175
)
@@ -440,7 +448,14 @@ final class SubchannelTests: XCTestCase {
440448
.connectivityStateChanged(.idle),
441449
.connectivityStateChanged(.connecting),
442450
.connectivityStateChanged(.ready),
443-
.connectivityStateChanged(.transientFailure),
451+
.connectivityStateChanged(
452+
.transientFailure(
453+
cause: RPCError(
454+
code: .unavailable,
455+
message: "The TCP connection was dropped unexpectedly."
456+
)
457+
)
458+
),
444459
.requiresNameResolution,
445460
.connectivityStateChanged(.connecting),
446461
.connectivityStateChanged(.ready),

0 commit comments

Comments
 (0)