Skip to content

Commit 29b2b5f

Browse files
committed
fix infinite loop in cluster mode when loosing connection to a node
1 parent e5a1e0c commit 29b2b5f

File tree

3 files changed

+32
-9
lines changed

3 files changed

+32
-9
lines changed

src/commands/cluster_commands.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -761,11 +761,12 @@ pub struct ClusterNodeResult {
761761
}
762762

763763
/// Cluster health status for the [`cluster_shards`](ClusterCommands::cluster_shards) command.
764-
#[derive(Debug, Deserialize)]
764+
#[derive(Debug, Deserialize, PartialEq, Eq)]
765765
#[serde(rename_all = "lowercase")]
766766
pub enum ClusterHealthStatus {
767767
Online,
768768
Failed,
769+
Fail,
769770
Loading,
770771
}
771772

src/network/cluster_connection.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,10 @@ impl ClusterConnection {
509509
let read_futures = self.nodes.iter_mut().map(|n| n.connection.read().boxed());
510510
let (result, node_idx, _) = future::select_all(read_futures).await;
511511

512+
if result.is_none() {
513+
return None;
514+
}
515+
512516
if let Some(Ok(bytes)) = &result {
513517
if bytes.is_push_message() {
514518
return result;
@@ -940,7 +944,10 @@ impl ClusterConnection {
940944
let mut slot_ranges = Vec::<SlotRange>::new();
941945

942946
for shard_info in shard_info_list.into_iter() {
943-
let Some(master_info) = shard_info.nodes.into_iter().find(|n| n.role == "master")
947+
let Some(master_info) = shard_info
948+
.nodes
949+
.into_iter()
950+
.find(|n| n.role == "master" && n.health == ClusterHealthStatus::Online)
944951
else {
945952
return Err(Error::Client("Cluster misconfiguration".to_owned()));
946953
};

src/tests/cluster.rs

+22-7
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@ use crate::{
66
ClusterShardResult, ConnectionCommands, FlushingMode, GenericCommands, HelloOptions,
77
MigrateOptions, ScriptingCommands, ServerCommands, StringCommands,
88
},
9-
network::{Version, ClusterConnection},
9+
network::{ClusterConnection, Version},
1010
sleep, spawn,
1111
tests::{get_cluster_test_client, get_cluster_test_client_with_command_timeout},
1212
Error, RedisError, RedisErrorKind, Result,
1313
};
14-
use serial_test::serial;
15-
use std::{
16-
collections::HashSet,
17-
future::IntoFuture,
18-
};
1914
use futures_util::try_join;
15+
use serial_test::serial;
16+
use std::{collections::HashSet, future::IntoFuture, time::Duration};
2017

2118
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
2219
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
@@ -464,4 +461,22 @@ async fn commands_to_different_nodes() -> Result<()> {
464461
assert_eq!("1", val1);
465462
assert_eq!("2", val2);
466463
Ok(())
467-
}
464+
}
465+
466+
/// test reconnection to replica when master is stopped
467+
/// master stop is not automated but must be done manually
468+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
469+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
470+
#[serial]
471+
#[ignore]
472+
async fn get_loop() -> Result<()> {
473+
let client = get_cluster_test_client().await?;
474+
client.set("key", "value").await?;
475+
476+
for _ in 0..1000 {
477+
let _value: Result<String> = client.get("key").await;
478+
sleep(Duration::from_secs(1)).await;
479+
}
480+
481+
Ok(())
482+
}

0 commit comments

Comments
 (0)