Skip to content

Commit fb49efb

Browse files
committed
Feature: Add DecomposeResult to simplify error handling
This commit treats remote errors occurring during RPC, like a `Fatal` error, as an `Unreachable` error. This is due to Openraft's current inability to distinguish between an unreachable node and a broken node. - **Helper trait `DecomposeResult`:** Introduced to simplify handling composite errors. It converts a result of the form `Result<R, ErrorAOrB>` into a nested result `Result<Result<R, ErrorA>, ErrorB>`.
1 parent afb1897 commit fb49efb

File tree

7 files changed

+153
-24
lines changed

7 files changed

+153
-24
lines changed

openraft/src/error.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Error types exposed by this crate.
22
3+
pub mod decompose;
4+
pub mod into_ok;
35
mod replication_closed;
46
mod streaming_error;
57

@@ -256,7 +258,7 @@ where
256258
StorageError(#[from] StorageError<NID>),
257259

258260
#[error(transparent)]
259-
RPCError(#[from] RPCError<NID, N, RaftError<NID, Infallible>>),
261+
RPCError(#[from] RPCError<NID, N>),
260262
}
261263

262264
/// Error occurs when invoking a remote raft API.
@@ -270,7 +272,7 @@ where
270272
serde(bound(serialize = "E: serde::Serialize")),
271273
serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))
272274
)]
273-
pub enum RPCError<NID: NodeId, N: Node, E: Error> {
275+
pub enum RPCError<NID: NodeId, N: Node, E: Error = Infallible> {
274276
#[error(transparent)]
275277
Timeout(#[from] Timeout<NID>),
276278

openraft/src/error/decompose.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::error::Error;
2+
3+
use crate::error::into_ok::into_ok;
4+
use crate::error::Fatal;
5+
use crate::error::Infallible;
6+
use crate::error::RPCError;
7+
use crate::error::RaftError;
8+
use crate::error::StreamingError;
9+
use crate::error::Unreachable;
10+
use crate::RaftTypeConfig;
11+
12+
/// Simplifies error handling by extracting the inner error from a composite error.
13+
/// For example, converting `Result<R, CompositeError>`
14+
/// to `Result<Result<R, Self::InnerError>, OuterError>`,
15+
/// where `SomeCompositeError` is a composite of `Self::InnerError` and `OuterError`.
16+
pub trait DecomposeResult<C, R, OuterError>
17+
where C: RaftTypeConfig
18+
{
19+
type InnerError;
20+
21+
fn decompose(self) -> Result<Result<R, Self::InnerError>, OuterError>;
22+
23+
/// Convert `Result<R, CompositeErr>`
24+
/// to `Result<R, E>`,
25+
/// if `Self::InnerError` is a infallible type.
26+
fn decompose_infallible(self) -> Result<R, OuterError>
27+
where
28+
Self::InnerError: Into<Infallible>,
29+
Self: Sized,
30+
{
31+
self.decompose().map(into_ok)
32+
}
33+
}
34+
35+
impl<C, R, E> DecomposeResult<C, R, RaftError<C::NodeId>> for Result<R, RaftError<C::NodeId, E>>
36+
where C: RaftTypeConfig
37+
{
38+
type InnerError = E;
39+
40+
fn decompose(self) -> Result<Result<R, E>, RaftError<C::NodeId>> {
41+
match self {
42+
Ok(r) => Ok(Ok(r)),
43+
Err(e) => match e {
44+
RaftError::APIError(e) => Ok(Err(e)),
45+
RaftError::Fatal(e) => Err(RaftError::Fatal(e)),
46+
},
47+
}
48+
}
49+
}
50+
51+
impl<C, R, E> DecomposeResult<C, R, RPCError<C::NodeId, C::Node>>
52+
for Result<R, RPCError<C::NodeId, C::Node, RaftError<C::NodeId, E>>>
53+
where
54+
C: RaftTypeConfig,
55+
E: Error,
56+
{
57+
type InnerError = E;
58+
59+
/// `RaftError::Fatal` is considered as `RPCError::Unreachable`.
60+
fn decompose(self) -> Result<Result<R, E>, RPCError<C::NodeId, C::Node>> {
61+
match self {
62+
Ok(r) => Ok(Ok(r)),
63+
Err(e) => match e {
64+
RPCError::Timeout(e) => Err(RPCError::Timeout(e)),
65+
RPCError::Unreachable(e) => Err(RPCError::Unreachable(e)),
66+
RPCError::PayloadTooLarge(e) => Err(RPCError::PayloadTooLarge(e)),
67+
RPCError::Network(e) => Err(RPCError::Network(e)),
68+
RPCError::RemoteError(e) => match e.source {
69+
RaftError::APIError(e) => Ok(Err(e)),
70+
RaftError::Fatal(e) => Err(RPCError::Unreachable(Unreachable::new(&e))),
71+
},
72+
},
73+
}
74+
}
75+
}
76+
77+
impl<C, R> DecomposeResult<C, R, StreamingError<C>> for Result<R, StreamingError<C, Fatal<C::NodeId>>>
78+
where C: RaftTypeConfig
79+
{
80+
type InnerError = Infallible;
81+
82+
/// `Fatal` is considered as `RPCError::Unreachable`.
83+
fn decompose(self) -> Result<Result<R, Self::InnerError>, StreamingError<C>> {
84+
match self {
85+
Ok(r) => Ok(Ok(r)),
86+
Err(e) => match e {
87+
StreamingError::Closed(e) => Err(StreamingError::Closed(e)),
88+
StreamingError::StorageError(e) => Err(StreamingError::StorageError(e)),
89+
StreamingError::Timeout(e) => Err(StreamingError::Timeout(e)),
90+
StreamingError::Unreachable(e) => Err(StreamingError::Unreachable(e)),
91+
StreamingError::Network(e) => Err(StreamingError::Network(e)),
92+
StreamingError::RemoteError(e) => Err(StreamingError::Unreachable(Unreachable::new(&e.source))),
93+
},
94+
}
95+
}
96+
}

openraft/src/error/into_ok.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use crate::error::Infallible;
2+
3+
/// Trait to convert `Result<T, E>` to `T`, if `E` is a `never` type.
4+
pub(crate) trait UnwrapInfallible<T> {
5+
fn into_ok(self) -> T;
6+
}
7+
8+
impl<T, E> UnwrapInfallible<T> for Result<T, E>
9+
where E: Into<Infallible>
10+
{
11+
fn into_ok(self) -> T {
12+
match self {
13+
Ok(t) => t,
14+
Err(_) => unreachable!(),
15+
}
16+
}
17+
}
18+
19+
/// Convert `Result<T, E>` to `T`, if `E` is a `never` type.
20+
pub(crate) fn into_ok<T, E>(result: Result<T, E>) -> T
21+
where E: Into<Infallible> {
22+
UnwrapInfallible::into_ok(result)
23+
}

openraft/src/error/streaming_error.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use std::error::Error;
22

3+
use crate::error::Fatal;
4+
use crate::error::Infallible;
35
use crate::error::NetworkError;
46
use crate::error::RPCError;
5-
use crate::error::RaftError;
67
use crate::error::RemoteError;
78
use crate::error::ReplicationClosed;
89
use crate::error::ReplicationError;
@@ -21,7 +22,7 @@ use crate::StorageError;
2122
serde(bound(serialize = "E: serde::Serialize")),
2223
serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))
2324
)]
24-
pub enum StreamingError<C: RaftTypeConfig, E: Error> {
25+
pub enum StreamingError<C: RaftTypeConfig, E: Error = Infallible> {
2526
/// The replication stream is closed intentionally.
2627
#[error(transparent)]
2728
Closed(#[from] ReplicationClosed),
@@ -47,25 +48,32 @@ pub enum StreamingError<C: RaftTypeConfig, E: Error> {
4748
RemoteError(#[from] RemoteError<C::NodeId, C::Node, E>),
4849
}
4950

50-
impl<C: RaftTypeConfig, E> From<StreamingError<C, E>> for ReplicationError<C::NodeId, C::Node>
51-
where
52-
E: Error,
53-
RaftError<C::NodeId>: From<E>,
54-
{
55-
fn from(e: StreamingError<C, E>) -> Self {
51+
impl<C: RaftTypeConfig> From<StreamingError<C, Fatal<C::NodeId>>> for ReplicationError<C::NodeId, C::Node> {
52+
fn from(e: StreamingError<C, Fatal<C::NodeId>>) -> Self {
5653
match e {
5754
StreamingError::Closed(e) => ReplicationError::Closed(e),
5855
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
5956
StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)),
6057
StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)),
6158
StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)),
6259
StreamingError::RemoteError(e) => {
63-
let remote_err = RemoteError {
64-
target: e.target,
65-
target_node: e.target_node,
66-
source: RaftError::from(e.source),
67-
};
68-
ReplicationError::RPCError(RPCError::RemoteError(remote_err))
60+
// Fatal on remote error is considered as unreachable.
61+
ReplicationError::RPCError(RPCError::Unreachable(Unreachable::new(&e.source)))
62+
}
63+
}
64+
}
65+
}
66+
67+
impl<C: RaftTypeConfig> From<StreamingError<C>> for ReplicationError<C::NodeId, C::Node> {
68+
fn from(e: StreamingError<C>) -> Self {
69+
match e {
70+
StreamingError::Closed(e) => ReplicationError::Closed(e),
71+
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
72+
StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)),
73+
StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)),
74+
StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)),
75+
StreamingError::RemoteError(_e) => {
76+
unreachable!("Infallible error should not be converted to ReplicationError")
6977
}
7078
}
7179
}

openraft/src/replication/callbacks.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Callbacks for ReplicationCore internal communication.
22
use core::fmt;
33

4-
use crate::error::Fatal;
54
use crate::error::StreamingError;
65
use crate::raft::SnapshotResponse;
76
use crate::type_config::alias::InstantOf;
@@ -23,14 +22,14 @@ pub(crate) struct SnapshotCallback<C: RaftTypeConfig> {
2322
pub(crate) snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
2423

2524
/// The result of the snapshot replication.
26-
pub(crate) result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
25+
pub(crate) result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
2726
}
2827

2928
impl<C: RaftTypeConfig> SnapshotCallback<C> {
3029
pub(in crate::replication) fn new(
3130
start_time: InstantOf<C>,
3231
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
33-
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
32+
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
3433
) -> Self {
3534
Self {
3635
start_time,

openraft/src/replication/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ use crate::config::Config;
2828
use crate::core::notify::Notify;
2929
use crate::core::sm::handle::SnapshotReader;
3030
use crate::display_ext::DisplayOptionExt;
31+
use crate::error::decompose::DecomposeResult;
3132
use crate::error::HigherVote;
3233
use crate::error::PayloadTooLarge;
3334
use crate::error::RPCError;
34-
use crate::error::RaftError;
3535
use crate::error::ReplicationClosed;
3636
use crate::error::ReplicationError;
3737
use crate::error::Timeout;
@@ -449,7 +449,7 @@ where
449449
RPCError::Timeout(to)
450450
})?; // return Timeout error
451451

452-
let append_resp = append_res?;
452+
let append_resp = DecomposeResult::<C, _, _>::decompose_infallible(append_res)?;
453453

454454
tracing::debug!(
455455
req = display(&sending_range),
@@ -496,7 +496,7 @@ where
496496

497497
/// Send the error result to RaftCore.
498498
/// RaftCore will then submit another replication command.
499-
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>) {
499+
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C::NodeId, C::Node>) {
500500
let _ = self.tx_raft_core.send(Notify::Network {
501501
response: Response::Progress {
502502
target: self.target,
@@ -779,6 +779,8 @@ where
779779
tracing::warn!(error = display(e), "failed to send snapshot");
780780
}
781781

782+
let res = res.decompose_infallible();
783+
782784
if let Some(tx_noty) = weak_tx.upgrade() {
783785
let data = Data::new_snapshot_callback(request_id, start_time, meta, res);
784786
let send_res = tx_noty.send(Replicate::new_data(data));

openraft/src/replication/request.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ where C: RaftTypeConfig
5252
}
5353

5454
use crate::display_ext::DisplayOptionExt;
55-
use crate::error::Fatal;
5655
use crate::error::StreamingError;
5756
use crate::log_id_range::LogIdRange;
5857
use crate::raft::SnapshotResponse;
@@ -151,7 +150,7 @@ where C: RaftTypeConfig
151150
request_id: RequestId,
152151
start_time: InstantOf<C>,
153152
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
154-
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
153+
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
155154
) -> Self {
156155
Self::SnapshotCallback(DataWithId::new(
157156
request_id,

0 commit comments

Comments
 (0)