Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ package final class Connection: Sendable {
/// The connect attempt succeeded and the connection is ready to use.
case connectSucceeded
/// The connect attempt failed.
case connectFailed(any Error)
case connectFailed(RPCError)
/// The connection received a GOAWAY and will close soon. No new streams
/// should be opened on this connection.
case goingAway(HTTP2ErrorCode, String)
Expand All @@ -68,7 +68,7 @@ package final class Connection: Sendable {
/// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
case remote
/// Closed because the connection encountered an unexpected error.
case error(any Error, wasIdle: Bool)
case error(RPCError, wasIdle: Bool)
}

/// Inputs to the 'run' method.
Expand Down Expand Up @@ -127,9 +127,20 @@ package final class Connection: Sendable {
/// This function returns when the connection has closed. You can observe connection events
/// by consuming the ``events`` sequence.
package func run() async {
let connectResult = await Result {
try await self.http2Connector.establishConnection(to: self.address)
func establishConnectionOrThrow() async throws(RPCError) -> HTTP2Connection {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the right gymnastics: the other way to do this is to be explicit with the types in the catching: closure:

await Result { () async throws(RPCError) -> HTTP2Connection in
  // ...
}

do {
return try await self.http2Connector.establishConnection(to: self.address)
} catch let error as RPCError {
throw error
} catch {
throw RPCError(
code: .unavailable,
message: "Could not establish a connection to \(self.address).",
cause: error
)
}
}
let connectResult = await Result(catching: establishConnectionOrThrow)

switch connectResult {
case .success(let connected):
Expand Down Expand Up @@ -236,6 +247,7 @@ package final class Connection: Sendable {
// This state is tracked here so that if the connection events sequence finishes and the
// connection never became ready then the connection can report that the connect failed.
var isReady = false
var unexpectedCloseError: (any Error)?

func makeNeverReadyError(cause: (any Error)?) -> RPCError {
return RPCError(
Expand Down Expand Up @@ -265,10 +277,15 @@ package final class Connection: Sendable {
// The connection will close at some point soon, yield a notification for this
// because the close might not be imminent and this could result in address resolution.
self.event.continuation.yield(.goingAway(errorCode, reason))
case .idle, .keepaliveExpired, .initiatedLocally, .unexpected:
case .idle, .keepaliveExpired, .initiatedLocally:
// The connection will be closed imminently in these cases there's no need to do
// anything.
()
case .unexpected(let error, _):
// The connection will be closed imminently in this case.
// We'll store the error that caused the unexpected closure so we
// can surface it.
unexpectedCloseError = error
}

// Take the reason with the highest precedence. A GOAWAY may be superseded by user
Expand Down Expand Up @@ -318,7 +335,7 @@ package final class Connection: Sendable {
finalEvent = .closed(connectionCloseReason)
} else {
// The connection never became ready, this therefore counts as a failed connect attempt.
finalEvent = .connectFailed(makeNeverReadyError(cause: nil))
finalEvent = .connectFailed(makeNeverReadyError(cause: unexpectedCloseError))
}

// The connection events sequence has finished: the connection is now closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

package import GRPCCore

package enum ConnectivityState: Sendable, Hashable {
/// This channel isn't trying to create a connection because of a lack of new or pending RPCs.
///
Expand All @@ -34,7 +36,7 @@ package enum ConnectivityState: Sendable, Hashable {
/// establish a connection again. Since retries are done with exponential backoff, channels that
/// fail to connect will start out spending very little time in this state but as the attempts
/// fail repeatedly, the channel will spend increasingly large amounts of time in this state.
case transientFailure
case transientFailure(cause: RPCError)

/// This channel has started shutting down. Any new RPCs should fail immediately. Pending RPCs
/// may continue running until the application cancels them. Channels may enter this state either
Expand Down
18 changes: 14 additions & 4 deletions Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ extension GRPCChannel {
}

case .failRPC:
return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready"))
return .stopTrying(RPCError(code: .unavailable, message: "Channel isn't ready."))
}
}

Expand All @@ -300,7 +300,7 @@ extension GRPCChannel {
loadBalancer: LoadBalancer
) async -> MakeStreamResult {
guard let subchannel = loadBalancer.pickSubchannel() else {
return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready"))
return .tryAgain(RPCError(code: .unavailable, message: "Channel isn't ready."))
}

let methodConfig = self.config(forMethod: descriptor)
Expand Down Expand Up @@ -758,14 +758,24 @@ extension GRPCChannel.StateMachine {
result: .success(state.current)
)

case .transientFailure, .shutdown: // shutdown includes shutting down
case .transientFailure(let cause):
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
// the entries in the queue will wait for a load-balancer to become ready.
let continuations = state.queue.removeFastFailingEntries()
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
continuations: continuations,
result: .failure(RPCError(code: .unavailable, message: "channel isn't ready"))
result: .failure(cause)
)

case .shutdown: // shutdown includes shutting down
// Current load-balancer failed. Remove all the 'fast-failing' continuations in the
// queue, these are RPCs which set the 'wait for ready' option to false. The rest of
// the entries in the queue will wait for a load-balancer to become ready.
let continuations = state.queue.removeFastFailingEntries()
actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations(
continuations: continuations,
result: .failure(RPCError(code: .unavailable, message: "Channel isn't ready."))
)

case .idle, .connecting:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ extension RoundRobinLoadBalancer {
// The transition from transient failure to connecting is ignored.
//
// See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
if self.state == .transientFailure, newState == .connecting {
if case .transientFailure = self.state, newState == .connecting {
return false
}

Expand Down Expand Up @@ -750,12 +750,26 @@ extension ConnectivityState {
return .idle
}

// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state
// is TRANSIENT_FAILURE.
if states.allSatisfy({ $0 == .transientFailure }) {
return .transientFailure
// Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state is TRANSIENT_FAILURE.
var cause: RPCError?
for state in states {
switch state {
case .transientFailure(let error):
// Pick one of the errors to surface, as we can't surface all of them.
cause = error
case .shutdown:
return .shutdown
case .idle, .connecting, .ready:
fatalError("Unreachable state: these should have been handled above.")
}
}

return .shutdown
if let cause {
return .transientFailure(cause: cause)
} else {
// We can only reach this point without a `cause` if `states` was empty.
// Fall back to shutdown: we have nothing better to do.
return .shutdown
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ extension Subchannel {
switch event {
case .connectSucceeded:
self.handleConnectSucceededEvent()
case .connectFailed:
self.handleConnectFailedEvent(in: &group)
case .connectFailed(let cause):
self.handleConnectFailedEvent(in: &group, error: cause)
case .goingAway:
self.handleGoingAwayEvent()
case .closed(let reason):
Expand All @@ -282,7 +282,7 @@ extension Subchannel {
}
}

private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) {
private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup, error: RPCError) {
let onConnectFailed = self.state.withLock { $0.connectFailed(connector: self.connector) }
switch onConnectFailed {
case .connect(let connection):
Expand All @@ -291,7 +291,11 @@ extension Subchannel {

case .backoff(let duration):
// All addresses have been tried, backoff for some time.
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
self.event.continuation.yield(
.connectivityStateChanged(
.transientFailure(cause: error)
)
)
group.addTask {
do {
try await Task.sleep(for: duration)
Expand Down Expand Up @@ -334,9 +338,9 @@ extension Subchannel {
case .emitIdle:
self.event.continuation.yield(.connectivityStateChanged(.idle))

case .emitTransientFailureAndReconnect:
case .emitTransientFailureAndReconnect(let cause):
// Unclean closes trigger a transient failure state change and a name resolution.
self.event.continuation.yield(.connectivityStateChanged(.transientFailure))
self.event.continuation.yield(.connectivityStateChanged(.transientFailure(cause: cause)))
self.event.continuation.yield(.requiresNameResolution)
// Attempt to reconnect.
self.handleConnectInput(in: &group)
Expand Down Expand Up @@ -632,7 +636,7 @@ extension Subchannel {
enum OnClosed {
case nothing
case emitIdle
case emitTransientFailureAndReconnect
case emitTransientFailureAndReconnect(cause: RPCError)
case finish(emitShutdown: Bool)
}

Expand All @@ -646,9 +650,21 @@ extension Subchannel {
self = .notConnected(NotConnected(from: state))
onClosed = .emitIdle

case .keepaliveTimeout, .error(_, wasIdle: false):
case .keepaliveTimeout:
self = .notConnected(NotConnected(from: state))
onClosed = .emitTransientFailureAndReconnect(
cause: RPCError(
code: .unavailable,
message: """
The connection became unresponsive and was closed because the \
keepalive timeout fired.
"""
)
)

case .error(let error, wasIdle: false):
self = .notConnected(NotConnected(from: state))
onClosed = .emitTransientFailureAndReconnect
onClosed = .emitTransientFailureAndReconnect(cause: error)

case .initiatedLocally:
// Should be in the 'shuttingDown' state.
Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

extension Result where Failure == any Error {
extension Result {
/// Like `Result(catching:)`, but `async`.
///
/// - Parameter body: An `async` closure to catch the result of.
@inlinable
init(catching body: () async throws -> Success) async {
init(catching body: () async throws(Failure) -> Success) async {
do {
self = .success(try await body())
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ extension Connection.CloseReason {
return true

case (.error(let lhsError, let lhsStreams), .error(let rhsError, let rhsStreams)):
if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError {
return lhs == rhs && lhsStreams == rhsStreams
} else {
return lhsStreams == rhsStreams
}
return lhsError == rhsError && lhsStreams == rhsStreams

default:
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,15 @@ final class SubchannelTests: XCTestCase {
[
.connectivityStateChanged(.idle),
.connectivityStateChanged(.connecting),
.connectivityStateChanged(.transientFailure),
.connectivityStateChanged(
.transientFailure(
cause: RPCError(
code: .unavailable,
message:
"Could not establish a connection to [unix]test-connect-eventually-succeeds."
)
)
),
.connectivityStateChanged(.connecting),
]
)
Expand Down Expand Up @@ -440,7 +448,14 @@ final class SubchannelTests: XCTestCase {
.connectivityStateChanged(.idle),
.connectivityStateChanged(.connecting),
.connectivityStateChanged(.ready),
.connectivityStateChanged(.transientFailure),
.connectivityStateChanged(
.transientFailure(
cause: RPCError(
code: .unavailable,
message: "The TCP connection was dropped unexpectedly."
)
)
),
.requiresNameResolution,
.connectivityStateChanged(.connecting),
.connectivityStateChanged(.ready),
Expand Down
Loading