Skip to content

Commit 45bc5dc

Browse files
committed
Surface transient connection errors
1 parent f11c322 commit 45bc5dc

File tree

7 files changed

+79
-31
lines changed

7 files changed

+79
-31
lines changed

Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ package final class Connection: Sendable {
6868
/// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
6969
case remote
7070
/// Closed because the connection encountered an unexpected error.
71-
case error(any Error, wasIdle: Bool)
71+
case error(RPCError, wasIdle: Bool)
7272
}
7373

7474
/// Inputs to the 'run' method.
@@ -236,6 +236,7 @@ package final class Connection: Sendable {
236236
// This state is tracked here so that if the connection events sequence finishes and the
237237
// connection never became ready then the connection can report that the connect failed.
238238
var isReady = false
239+
var unexpectedCloseError: (any Error)?
239240

240241
func makeNeverReadyError(cause: (any Error)?) -> RPCError {
241242
return RPCError(
@@ -265,10 +266,15 @@ package final class Connection: Sendable {
265266
// The connection will close at some point soon, yield a notification for this
266267
// because the close might not be imminent and this could result in address resolution.
267268
self.event.continuation.yield(.goingAway(errorCode, reason))
268-
case .idle, .keepaliveExpired, .initiatedLocally, .unexpected:
269+
case .idle, .keepaliveExpired, .initiatedLocally:
269270
// The connection will be closed imminently in these cases there's no need to do
270271
// anything.
271272
()
273+
case .unexpected(let error, _):
274+
// The connection will be closed imminently in this case.
275+
// We'll store the error that caused the unexpected closure so we
276+
// can surface it.
277+
unexpectedCloseError = error
272278
}
273279

274280
// Take the reason with the highest precedence. A GOAWAY may be superseded by user
@@ -318,7 +324,7 @@ package final class Connection: Sendable {
318324
finalEvent = .closed(connectionCloseReason)
319325
} else {
320326
// The connection never became ready, this therefore counts as a failed connect attempt.
321-
finalEvent = .connectFailed(makeNeverReadyError(cause: nil))
327+
finalEvent = .connectFailed(makeNeverReadyError(cause: unexpectedCloseError))
322328
}
323329

324330
// The connection events sequence has finished: the connection is now closed.

Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17+
package import GRPCCore
18+
1719
package enum ConnectivityState: Sendable, Hashable {
1820
/// This channel isn't trying to create a connection because of a lack of new or pending RPCs.
1921
///
@@ -34,7 +36,7 @@ package enum ConnectivityState: Sendable, Hashable {
3436
/// establish a connection again. Since retries are done with exponential backoff, channels that
3537
/// fail to connect will start out spending very little time in this state but as the attempts
3638
/// fail repeatedly, the channel will spend increasingly large amounts of time in this state.
37-
case transientFailure
39+
case transientFailure(cause: RPCError)
3840

3941
/// This channel has started shutting down. Any new RPCs should fail immediately. Pending RPCs
4042
/// may continue running until the application cancels them. Channels may enter this state either

Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ extension GRPCChannel {
290290
}
291291

292292
case .failRPC:
293-
return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
293+
return .stopTrying(RPCError(code: .unavailable, message: "Channel isn't ready."))
294294
}
295295
}
296296

@@ -300,7 +300,7 @@ extension GRPCChannel {
300300
loadBalancer: LoadBalancer
301301
) async -> MakeStreamResult {
302302
guard let subchannel = loadBalancer.pickSubchannel() else {
303-
return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
303+
return .tryAgain(RPCError(code: .unavailable, message: "Channel isn't ready."))
304304
}
305305

306306
let methodConfig = self.config(forMethod: descriptor)
@@ -758,14 +758,30 @@ extension GRPCChannel.StateMachine {
758758
result: .success(state.current)
759759
)
760760

761-
case .transientFailure, .shutdown: // shutdown includes shutting down
761+
case .transientFailure(let cause):
762762
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
763763
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
764764
// the entries in the queue will wait for a load-balancer to become ready.
765765
let continuations = state.queue.removeFastFailingEntries()
766766
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
767767
continuations: continuations,
768-
result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
768+
result: .failure(
769+
RPCError(
770+
code: .unavailable,
771+
message: "Channel isn't ready.",
772+
cause: cause
773+
)
774+
)
775+
)
776+
777+
case .shutdown: // shutdown includes shutting down
778+
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
779+
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
780+
// the entries in the queue will wait for a load-balancer to become ready.
781+
let continuations = state.queue.removeFastFailingEntries()
782+
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
783+
continuations: continuations,
784+
result: .failure(RPCError(code: .unavailable, message: "Channel isn't ready."))
769785
)
770786

771787
case .idle, .connecting:

Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ extension RoundRobinLoadBalancer {
530530
// The transition from transient failure to connecting is ignored.
531531
//
532532
// See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
533-
if self.state == .transientFailure, newState == .connecting {
533+
if case .transientFailure = self.state, newState == .connecting {
534534
return false
535535
}
536536

@@ -735,6 +735,10 @@ extension ConnectivityState {
735735
static func aggregate(_ states: some Collection<ConnectivityState>) -> ConnectivityState {
736736
// See https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
737737

738+
if states.isEmpty {
739+
return .shutdown
740+
}
741+
738742
// If any one subchannel is in READY state, the channel's state is READY.
739743
if states.contains(where: { $0 == .ready }) {
740744
return .ready
@@ -750,12 +754,16 @@ extension ConnectivityState {
750754
return .idle
751755
}
752756

753-
// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
754-
// is TRANSIENT_FAILURE.
755-
if states.allSatisfy({ $0 == .transientFailure }) {
756-
return .transientFailure
757+
// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state is TRANSIENT_FAILURE.
758+
// Pick one of the errors to surface, as we can't surface all of them.
759+
var cause: RPCError!
760+
for state in states {
761+
guard case .transientFailure(let error) = state else {
762+
return .shutdown
763+
}
764+
cause = error
757765
}
758766

759-
return .shutdown
767+
return .transientFailure(cause: cause)
760768
}
761769
}

Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ extension Subchannel {
256256
switch event {
257257
case .connectSucceeded:
258258
self.handleConnectSucceededEvent()
259-
case .connectFailed:
260-
self.handleConnectFailedEvent(in: &group)
259+
case .connectFailed(let cause):
260+
self.handleConnectFailedEvent(in: &group, error: cause)
261261
case .goingAway:
262262
self.handleGoingAwayEvent()
263263
case .closed(let reason):
@@ -282,16 +282,25 @@ extension Subchannel {
282282
}
283283
}
284284

285-
private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
285+
private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup, error: any Error) {
286286
let onConnectFailed = self.state.withLock { $0.connectFailed(connector: self.connector) }
287287
switch onConnectFailed {
288288
case .connect(let connection):
289289
// Try the next address.
290290
self.runConnection(connection, in: &group)
291291

292292
case .backoff(let duration):
293+
let transientFailureCause = (error as? RPCError) ?? RPCError(
294+
code: .unavailable,
295+
message: "All addresses have been tried: backing off.",
296+
cause: error
297+
)
293298
// All addresses have been tried, backoff for some time.
294-
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
299+
self.event.continuation.yield(
300+
.connectivityStateChanged(
301+
.transientFailure(cause: transientFailureCause)
302+
)
303+
)
295304
group.addTask {
296305
do {
297306
try await Task.sleep(for: duration)
@@ -334,9 +343,9 @@ extension Subchannel {
334343
case .emitIdle:
335344
self.event.continuation.yield(.connectivityStateChanged(.idle))
336345

337-
case .emitTransientFailureAndReconnect:
346+
case .emitTransientFailureAndReconnect(let cause):
338347
// Unclean closes trigger a transient failure state change and a name resolution.
339-
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
348+
self.event.continuation.yield(.connectivityStateChanged(.transientFailure(cause: cause)))
340349
self.event.continuation.yield(.requiresNameResolution)
341350
// Attempt to reconnect.
342351
self.handleConnectInput(in: &group)
@@ -632,7 +641,7 @@ extension Subchannel {
632641
enum OnClosed {
633642
case nothing
634643
case emitIdle
635-
case emitTransientFailureAndReconnect
644+
case emitTransientFailureAndReconnect(cause: RPCError)
636645
case finish(emitShutdown: Bool)
637646
}
638647

@@ -646,9 +655,15 @@ extension Subchannel {
646655
self = .notConnected(NotConnected(from: state))
647656
onClosed = .emitIdle
648657

649-
case .keepaliveTimeout, .error(_, wasIdle: false):
658+
case .keepaliveTimeout:
659+
self = .notConnected(NotConnected(from: state))
660+
onClosed = .emitTransientFailureAndReconnect(
661+
cause: RPCError(code: .unavailable, message: "The keepalive timed out.")
662+
)
663+
664+
case .error(let error, wasIdle: false):
650665
self = .notConnected(NotConnected(from: state))
651-
onClosed = .emitTransientFailureAndReconnect
666+
onClosed = .emitTransientFailureAndReconnect(cause: error)
652667

653668
case .initiatedLocally:
654669
// Should be in the 'shuttingDown' state.

Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,8 @@ extension Connection.CloseReason {
5353
(.remote, .remote):
5454
return true
5555

56-
case (.error(let lhsError, let lhsStreams), .error(let rhsError, let rhsStreams)):
57-
if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError {
58-
return lhs == rhs && lhsStreams == rhsStreams
59-
} else {
60-
return lhsStreams == rhsStreams
61-
}
56+
case (.error(_, let lhsStreams), .error(_, let rhsStreams)):
57+
return lhs == rhs && lhsStreams == rhsStreams
6258

6359
default:
6460
return false

Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ final class SubchannelTests: XCTestCase {
161161
[
162162
.connectivityStateChanged(.idle),
163163
.connectivityStateChanged(.connecting),
164-
.connectivityStateChanged(.transientFailure),
164+
.connectivityStateChanged(.transientFailure(cause: RPCError(
165+
code: .unavailable,
166+
message: "All addresses have been tried: backing off."
167+
))),
165168
.connectivityStateChanged(.connecting),
166169
]
167170
)
@@ -440,7 +443,9 @@ final class SubchannelTests: XCTestCase {
440443
.connectivityStateChanged(.idle),
441444
.connectivityStateChanged(.connecting),
442445
.connectivityStateChanged(.ready),
443-
.connectivityStateChanged(.transientFailure),
446+
.connectivityStateChanged(.transientFailure(cause: RPCError(
447+
code: .unavailable, message: "The TCP connection was dropped unexpectedly."
448+
))),
444449
.requiresNameResolution,
445450
.connectivityStateChanged(.connecting),
446451
.connectivityStateChanged(.ready),

0 commit comments

Comments
 (0)