Skip to content

Commit 97b71be

Browse files
committed
When servers list is empty for a db/operation, always force a RT refresh
1 parent 342c638 commit 97b71be

File tree

4 files changed

+47
-53
lines changed

4 files changed

+47
-53
lines changed

lib/src/pool.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ pub fn create_pool(config: &Config) -> Result<ConnectionPool> {
6464
)?;
6565
info!(
6666
"creating connection pool for node {} with max size {}",
67-
config.uri,
68-
config.max_connections
67+
config.uri, config.max_connections
6968
);
7069
Ok(ConnectionPool::builder(mgr)
7170
.max_size(config.max_connections)

lib/src/routing/connection_registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl BoltServer {
3636
})
3737
.collect()
3838
}
39-
39+
4040
pub fn has_same_address(&self, other: &Self) -> bool {
4141
self.address == other.address && self.port == other.port
4242
}

lib/src/routing/load_balancing/round_robin_strategy.rs

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ impl RoundRobinStrategy {
1818
}
1919
}
2020

21-
fn select(all_servers: &[BoltServer], servers: &[BoltServer], index: &AtomicUsize) -> Option<BoltServer> {
21+
fn select(
22+
all_servers: &[BoltServer],
23+
servers: &[BoltServer],
24+
index: &AtomicUsize,
25+
) -> Option<BoltServer> {
2226
if servers.is_empty() {
2327
return None;
2428
}
@@ -28,12 +32,8 @@ impl RoundRobinStrategy {
2832
if used.len() >= all_servers.len() {
2933
return None; // All servers have been used
3034
}
31-
let _ = index.compare_exchange(
32-
0,
33-
all_servers.len(),
34-
Ordering::Relaxed,
35-
Ordering::Relaxed,
36-
);
35+
let _ =
36+
index.compare_exchange(0, all_servers.len(), Ordering::Relaxed, Ordering::Relaxed);
3737
let i = index.fetch_sub(1, Ordering::Relaxed);
3838
if let Some(server) = all_servers.get(i - 1) {
3939
if servers.contains(server) {
@@ -52,7 +52,9 @@ impl LoadBalancingStrategy for RoundRobinStrategy {
5252
.filter(|s| s.role == "READ")
5353
.cloned()
5454
.collect::<Vec<BoltServer>>();
55-
let all_readers = self.connection_registry.all_servers()
55+
let all_readers = self
56+
.connection_registry
57+
.all_servers()
5658
.iter()
5759
.filter(|s| s.role == "READ")
5860
.cloned()
@@ -67,7 +69,9 @@ impl LoadBalancingStrategy for RoundRobinStrategy {
6769
.filter(|s| s.role == "WRITE")
6870
.cloned()
6971
.collect::<Vec<BoltServer>>();
70-
let all_writers = self.connection_registry.all_servers()
72+
let all_writers = self
73+
.connection_registry
74+
.all_servers()
7175
.iter()
7276
.filter(|s| s.role == "WRITE")
7377
.cloned()
@@ -88,24 +92,30 @@ mod tests {
8892
addresses: vec!["server1:7687".to_string()],
8993
role: "ROUTE".to_string(),
9094
}];
91-
let readers1 = vec![Server {
92-
addresses: vec!["server1:7687".to_string()],
93-
role: "READ".to_string(),
94-
}, Server {
95-
addresses: vec!["server2:7687".to_string()],
96-
role: "READ".to_string(),
97-
}];
95+
let readers1 = vec![
96+
Server {
97+
addresses: vec!["server1:7687".to_string()],
98+
role: "READ".to_string(),
99+
},
100+
Server {
101+
addresses: vec!["server2:7687".to_string()],
102+
role: "READ".to_string(),
103+
},
104+
];
98105
let writers1 = vec![Server {
99106
addresses: vec!["server4:7687".to_string()],
100107
role: "WRITE".to_string(),
101108
}];
102-
let readers2 = vec![Server {
103-
addresses: vec!["server1:7687".to_string()],
104-
role: "READ".to_string(),
105-
}, Server {
106-
addresses: vec!["server3:7687".to_string()],
107-
role: "READ".to_string(),
108-
}];
109+
let readers2 = vec![
110+
Server {
111+
addresses: vec!["server1:7687".to_string()],
112+
role: "READ".to_string(),
113+
},
114+
Server {
115+
addresses: vec!["server3:7687".to_string()],
116+
role: "READ".to_string(),
117+
},
118+
];
109119

110120
let writers2 = vec![Server {
111121
addresses: vec!["server4:7687".to_string()],
@@ -157,22 +167,16 @@ mod tests {
157167
let strategy = RoundRobinStrategy::new(registry.clone());
158168

159169
// select a reader for db-1
160-
let reader = RoundRobinStrategy::select(&all_readers, &servers1, &strategy.reader_index).unwrap();
161-
assert_eq!(
162-
reader.address,
163-
"server2"
164-
);
170+
let reader =
171+
RoundRobinStrategy::select(&all_readers, &servers1, &strategy.reader_index).unwrap();
172+
assert_eq!(reader.address, "server2");
165173
// select a reader for db-2
166-
let reader = RoundRobinStrategy::select(&all_readers, &servers2, &strategy.reader_index).unwrap();
167-
assert_eq!(
168-
reader.address,
169-
"server1"
170-
);
174+
let reader =
175+
RoundRobinStrategy::select(&all_readers, &servers2, &strategy.reader_index).unwrap();
176+
assert_eq!(reader.address, "server1");
171177
// select another reader for db-1
172-
let reader = RoundRobinStrategy::select(&all_readers, &servers1, &strategy.reader_index).unwrap();
173-
assert_eq!(
174-
reader.address,
175-
"server2"
176-
);
178+
let reader =
179+
RoundRobinStrategy::select(&all_readers, &servers1, &strategy.reader_index).unwrap();
180+
assert_eq!(reader.address, "server2");
177181
}
178182
}

lib/src/routing/routed_connection_manager.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,9 @@ impl RoutedConnectionManager {
116116
continue;
117117
}
118118
}
119-
} else {
120-
// We couldn't find a connection manager for the server, it was probably marked unavailable
121-
error!(
122-
"No connection manager available for router `{}` in the registry",
123-
server.address
124-
);
125-
return Err(Error::ServerUnavailableError(format!(
126-
"No connection manager available for router `{}` in the registry",
127-
server.address
128-
)));
129119
}
130120
}
131-
debug!("Routing table is empty for requested {op} operation, forcing refresh");
121+
debug!("No connection for requested {op} operation, forcing refresh of the routing table for database `{}`", db.as_deref().unwrap_or("default"));
132122
self.channel
133123
.send(RegistryCommand::RefreshSingleTable((
134124
db.clone(),
@@ -143,7 +133,8 @@ impl RoutedConnectionManager {
143133
})?;
144134
// table is not empty, but we couldn't get a connection, so we throw an error
145135
break Err(Error::ServerUnavailableError(format!(
146-
"No server available for {op} operation"
136+
"No server available for {op} operation on db `{}`",
137+
db.as_deref().unwrap_or("default")
147138
)));
148139
}
149140
}

0 commit comments

Comments
 (0)