Skip to content

Commit 362aa5f

Browse files
committed
Advanced reconnection strategy
- added reconnection config with 3 possible strategies: constant, linear or exponential - BREAKING CHANGE: removed client::Config::max_command_attempts fix #66
1 parent b168d8e commit 362aa5f

File tree

11 files changed

+386
-246
lines changed

11 files changed

+386
-246
lines changed

src/client/client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
};
3434
use futures_channel::{mpsc, oneshot};
3535
use futures_util::Stream;
36-
use log::trace;
36+
use log::{info, trace};
3737
use serde::de::DeserializeOwned;
3838
use std::{
3939
future::IntoFuture,
@@ -269,8 +269,10 @@ impl Client {
269269
fn send_message(&self, message: Message) -> Result<()> {
270270
if let Some(msg_sender) = &self.msg_sender as &Option<MsgSender> {
271271
trace!("Will enqueue message: {message:?}");
272-
msg_sender.unbounded_send(message)?;
273-
Ok(())
272+
Ok(msg_sender.unbounded_send(message).map_err(|e| {
273+
info!("{}", e.to_string());
274+
Error::Client("Disconnected from server".to_string())
275+
})?)
274276
} else {
275277
Err(Error::Client(
276278
"Invalid channel to send messages to the network handler".to_owned(),

src/client/config.rs

Lines changed: 111 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ const DEFAULT_AUTO_RESUBSCRTBE: bool = true;
1313
const DEFAULT_AUTO_REMONITOR: bool = true;
1414
const DEFAULT_KEEP_ALIVE: Option<Duration> = None;
1515
const DEFAULT_NO_DELAY: bool = true;
16-
const DEFAULT_MAX_COMMAND_ATTEMPTS: usize = 3;
1716
const DEFAULT_RETRY_ON_ERROR: bool = false;
1817

1918
type Uri<'a> = (
@@ -84,8 +83,6 @@ pub struct Config {
8483
///
8584
/// See [`TcpStream::set_nodelay`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay)
8685
pub no_delay: bool,
87-
/// Maximum number of retry attempts to send a command to the Redis server (default `3`).
88-
pub max_command_attempts: usize,
8986
/// Defines the default strategy for retries on network error (default `false`):
9087
/// * `true` - retry sending the command/batch of commands on network error
9188
/// * `false` - do not retry sending the command/batch of commands on network error
@@ -99,6 +96,8 @@ pub struct Config {
9996
/// * [`Client::send_and_forget`](crate::client::Client::send_and_forget)
10097
/// * [`Client::send_batch`](crate::client::Client::send_batch)
10198
pub retry_on_error: bool,
99+
/// Reconnection configuration (Constant, Linear or Exponential)
100+
pub reconnection: ReconnectionConfig,
102101
}
103102

104103
impl Default for Config {
@@ -117,8 +116,8 @@ impl Default for Config {
117116
connection_name: String::from(""),
118117
keep_alive: DEFAULT_KEEP_ALIVE,
119118
no_delay: DEFAULT_NO_DELAY,
120-
max_command_attempts: DEFAULT_MAX_COMMAND_ATTEMPTS,
121119
retry_on_error: DEFAULT_RETRY_ON_ERROR,
120+
reconnection: Default::default(),
122121
}
123122
}
124123
}
@@ -315,12 +314,6 @@ impl Config {
315314
}
316315
}
317316

318-
if let Some(max_command_attempts) = query.remove("max_command_attempts") {
319-
if let Ok(max_command_attempts) = max_command_attempts.parse::<usize>() {
320-
config.max_command_attempts = max_command_attempts;
321-
}
322-
}
323-
324317
if let Some(retry_on_error) = query.remove("retry_on_error") {
325318
if let Ok(retry_on_error) = retry_on_error.parse::<bool>() {
326319
config.retry_on_error = retry_on_error;
@@ -583,19 +576,6 @@ impl ToString for Config {
583576
s.push_str(&format!("no_delay={}", self.no_delay));
584577
}
585578

586-
if self.max_command_attempts != DEFAULT_MAX_COMMAND_ATTEMPTS {
587-
if !query_separator {
588-
query_separator = true;
589-
s.push('?');
590-
} else {
591-
s.push('&');
592-
}
593-
s.push_str(&format!(
594-
"max_command_attempts={}",
595-
self.max_command_attempts
596-
));
597-
}
598-
599579
if self.retry_on_error != DEFAULT_RETRY_ON_ERROR {
600580
if !query_separator {
601581
query_separator = true;
@@ -874,3 +854,111 @@ impl IntoConfig for Url {
874854
Config::from_uri(self)
875855
}
876856
}
857+
858+
/// The type of reconnection policy to use. This will apply to every connection used by the client.
859+
/// This code has been mostly inpisred by [fred ReconnectPolicy](https://docs.rs/fred/latest/fred/types/enum.ReconnectPolicy.html)
860+
#[derive(Debug, Clone)]
861+
pub enum ReconnectionConfig {
862+
/// Wait a constant amount of time between reconnection attempts, in ms.
863+
Constant {
864+
/// Maximum number of attemps, set `0` to retry forever.
865+
max_attempts: u32,
866+
/// Delay in ms to wait between reconnection attempts
867+
delay: u32,
868+
/// Add jitter in ms to each delay
869+
jitter: u32,
870+
},
871+
/// Backoff reconnection attempts linearly, adding `delay` each time.
872+
Linear {
873+
/// Maximum number of attemps, set `0` to retry forever.
874+
max_attempts: u32,
875+
/// Maximum delay in ms
876+
max_delay: u32,
877+
/// Delay in ms to add to the total waiting time at each attemp
878+
delay: u32,
879+
/// Add jitter in ms to each delay
880+
jitter: u32,
881+
},
882+
/// Backoff reconnection attempts exponentially, multiplying the last delay by `multiplicative_factor` each time.
883+
/// see https://en.wikipedia.org/wiki/Exponential_backoff
884+
Exponential {
885+
/// Maximum number of attemps, set `0` to retry forever.
886+
max_attempts: u32,
887+
/// Minimum delay in ms
888+
min_delay: u32,
889+
/// Maximum delay in ms
890+
max_delay: u32,
891+
// multiplicative factor
892+
multiplicative_factor: u32,
893+
/// Add jitter in ms to each delay
894+
jitter: u32,
895+
},
896+
}
897+
898+
/// The default amount of jitter when waiting to reconnect.
899+
const DEFAULT_JITTER_MS: u32 = 100;
900+
const DEFAULT_DELAY_MS: u32 = 1000;
901+
902+
impl Default for ReconnectionConfig {
903+
fn default() -> Self {
904+
Self::Constant {
905+
max_attempts: 0,
906+
delay: DEFAULT_DELAY_MS,
907+
jitter: DEFAULT_JITTER_MS,
908+
}
909+
}
910+
}
911+
912+
impl ReconnectionConfig {
913+
/// Create a new reconnect policy with a constant backoff.
914+
pub fn new_constant(max_attempts: u32, delay: u32) -> Self {
915+
Self::Constant {
916+
max_attempts,
917+
delay,
918+
jitter: DEFAULT_JITTER_MS,
919+
}
920+
}
921+
922+
/// Create a new reconnect policy with a linear backoff.
923+
pub fn new_linear(max_attempts: u32, max_delay: u32, delay: u32) -> Self {
924+
Self::Linear {
925+
max_attempts,
926+
max_delay,
927+
delay,
928+
jitter: DEFAULT_JITTER_MS,
929+
}
930+
}
931+
932+
/// Create a new reconnect policy with an exponential backoff.
933+
pub fn new_exponential(
934+
max_attempts: u32,
935+
min_delay: u32,
936+
max_delay: u32,
937+
multiplicative_factor: u32,
938+
) -> Self {
939+
Self::Exponential {
940+
max_delay,
941+
max_attempts,
942+
min_delay,
943+
multiplicative_factor,
944+
jitter: DEFAULT_JITTER_MS,
945+
}
946+
}
947+
948+
/// Set the amount of jitter to add to each reconnection delay.
949+
///
950+
/// Default: 100 ms
951+
pub fn set_jitter(&mut self, jitter_ms: u32) {
952+
match self {
953+
Self::Constant { ref mut jitter, .. } => {
954+
*jitter = jitter_ms;
955+
}
956+
Self::Linear { ref mut jitter, .. } => {
957+
*jitter = jitter_ms;
958+
}
959+
Self::Exponential { ref mut jitter, .. } => {
960+
*jitter = jitter_ms;
961+
}
962+
}
963+
}
964+
}

src/client/message.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
use log::warn;
12
use smallvec::SmallVec;
23

3-
use crate::{resp::Command, PushSender, PubSubSender, RetryReason, network::{ResultSender, ResultsSender}};
4+
use crate::{
5+
network::{ResultSender, ResultsSender},
6+
resp::Command,
7+
Error, PubSubSender, PushSender, RetryReason,
8+
};
49

510
#[cfg(debug_assertions)]
611
use std::sync::atomic::{AtomicUsize, Ordering};
712

813
#[cfg(debug_assertions)]
914
static MESSAGE_SEQUENCE_COUNTER: AtomicUsize = AtomicUsize::new(0);
1015

11-
#[allow(clippy::large_enum_variant)]
16+
#[allow(clippy::large_enum_variant)]
1217
#[derive(Debug)]
1318
pub(crate) enum Commands {
1419
None,
@@ -24,6 +29,26 @@ impl Commands {
2429
Commands::Batch(commands, _) => commands.len(),
2530
}
2631
}
32+
33+
pub fn send_error(self, tag: &str, error: Error) {
34+
match self {
35+
Commands::Single(_, Some(result_sender)) => {
36+
if let Err(e) = result_sender.send(Err(error)) {
37+
warn!(
38+
"[{tag}] Cannot send value to caller because receiver is not there anymore: {e:?}",
39+
);
40+
}
41+
}
42+
Commands::Batch(_, results_sender) => {
43+
if let Err(e) = results_sender.send(Err(error)) {
44+
warn!(
45+
"[{tag}] Cannot send value to caller because receiver is not there anymore: {e:?}",
46+
);
47+
}
48+
}
49+
_ => (),
50+
}
51+
}
2752
}
2853

2954
impl IntoIterator for Commands {
@@ -65,7 +90,7 @@ impl<'a> IntoIterator for &'a mut Commands {
6590
}
6691
}
6792

68-
#[allow(clippy::large_enum_variant)]
93+
#[allow(clippy::large_enum_variant)]
6994
pub enum CommandsIterator {
7095
Single(Option<Command>),
7196
Batch(std::vec::IntoIter<Command>),
@@ -123,7 +148,7 @@ pub(crate) struct Message {
123148
pub retry_on_error: bool,
124149
#[cfg(debug_assertions)]
125150
#[allow(unused)]
126-
pub (crate) message_seq: usize,
151+
pub(crate) message_seq: usize,
127152
}
128153

129154
impl Message {
@@ -154,7 +179,11 @@ impl Message {
154179
}
155180

156181
#[inline(always)]
157-
pub fn batch(commands: Vec<Command>, results_sender: ResultsSender, retry_on_error: bool) -> Self {
182+
pub fn batch(
183+
commands: Vec<Command>,
184+
results_sender: ResultsSender,
185+
retry_on_error: bool,
186+
) -> Self {
158187
Message {
159188
commands: Commands::Batch(commands, results_sender),
160189
pub_sub_senders: None,
@@ -184,11 +213,7 @@ impl Message {
184213
}
185214

186215
#[inline(always)]
187-
pub fn monitor(
188-
command: Command,
189-
result_sender: ResultSender,
190-
push_sender: PushSender,
191-
) -> Self {
216+
pub fn monitor(command: Command, result_sender: ResultSender, push_sender: PushSender) -> Self {
192217
Message {
193218
commands: Commands::Single(command, Some(result_sender)),
194219
pub_sub_senders: None,

src/error.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::{client::Message, Result};
1+
use crate::Result;
22
use futures_channel::{
3-
mpsc::{self, TrySendError},
3+
mpsc::{self},
44
oneshot,
55
};
66
use smallvec::SmallVec;
@@ -102,12 +102,6 @@ impl From<std::io::Error> for Error {
102102
}
103103
}
104104

105-
impl From<TrySendError<Message>> for Error {
106-
fn from(e: TrySendError<Message>) -> Self {
107-
Error::Client(e.to_string())
108-
}
109-
}
110-
111105
impl From<oneshot::Canceled> for Error {
112106
fn from(e: oneshot::Canceled) -> Self {
113107
Error::Client(e.to_string())

src/network/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod cluster_connection;
33
mod command_info_manager;
44
mod connection;
55
mod network_handler;
6+
mod reconnection_state;
67
mod sentinel_connection;
78
mod standalone_connection;
89
mod util;
@@ -13,6 +14,7 @@ pub(crate) use cluster_connection::*;
1314
pub(crate) use command_info_manager::*;
1415
pub(crate) use connection::*;
1516
pub(crate) use network_handler::*;
17+
pub(crate) use reconnection_state::*;
1618
pub(crate) use sentinel_connection::*;
1719
pub(crate) use standalone_connection::*;
1820
pub(crate) use version::*;

0 commit comments

Comments
 (0)