Skip to content

Commit 6748145

Browse files
authored
Support multiple routing tables (one per database) (#244)
* Keep a map of registries to store a routing table per database * Small changes * Refresh table when empty * Use empty string for default db * Use the new fetch single table command * Do not lock bookmarks * Fix endless loop * Create a pool for each server * Fix server removal * Get the pool from the pool map given the server instance * Fix load balancing strategy for multi database support * When servers list is empty for a db/operation, always force a RT refresh * Maintain the global ttl for refreshing routing tables, even if we ask for routing tables different from the one used in the connect configuration * Fix server ordering to ensure consistent load balancer outcome * Fix compilation issue after rebase with main * Use error level when refreshing routing table fails
1 parent a025007 commit 6748145

File tree

8 files changed

+588
-167
lines changed

8 files changed

+588
-167
lines changed

lib/src/connection.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -448,19 +448,18 @@ impl ConnectionInfo {
448448
tls_config: &ConnectionTLSConfig,
449449
) -> Result<(TlsConnector, ServerName<'static>)> {
450450
let mut root_cert_store = RootCertStore::empty();
451-
match rustls_native_certs::load_native_certs() {
452-
Ok(certs) => {
453-
root_cert_store.add_parsable_certificates(certs);
454-
}
455-
Err(e) => {
456-
warn!("Failed to load native certificates: {e}");
457-
}
458-
}
459451

460452
let builder = ClientConfig::builder();
461453
let config = match tls_config {
462454
ConnectionTLSConfig::None => {
463-
warn!("TLS config set to None but required from the URI. Using default config.");
455+
match rustls_native_certs::load_native_certs() {
456+
Ok(certs) => {
457+
root_cert_store.add_parsable_certificates(certs);
458+
}
459+
Err(e) => {
460+
warn!("Failed to load native certificates: {e}");
461+
}
462+
}
464463
builder
465464
.with_root_certificates(root_cert_store)
466465
.with_no_client_auth()

lib/src/graph.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@ pub(crate) enum ConnectionPoolManager {
3434

3535
impl ConnectionPoolManager {
3636
#[allow(unused_variables)]
37-
pub(crate) async fn get(&self, operation: Option<Operation>) -> Result<ManagedConnection> {
37+
pub(crate) async fn get(
38+
&self,
39+
operation: Option<Operation>,
40+
db: Option<Database>,
41+
) -> Result<ManagedConnection> {
3842
match self {
3943
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
40-
Routed(manager) => manager.get(operation).await,
44+
Routed(manager) => manager.get(operation, db).await,
4145
Direct(pool) => pool.get().await.map_err(crate::Error::from),
4246
}
4347
}
@@ -166,7 +170,7 @@ impl Graph {
166170
operation: Operation,
167171
bookmarks: &[String],
168172
) -> Result<Txn> {
169-
let connection = self.pool.get(Some(operation)).await?;
173+
let connection = self.pool.get(Some(operation), db.clone()).await?;
170174
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
171175
{
172176
Txn::new(db, self.config.fetch_size, connection, operation, bookmarks).await

lib/src/pool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ pub fn create_pool(config: &Config) -> Result<ConnectionPool> {
6868
&config.tls_config,
6969
)?;
7070
info!(
71-
"creating connection pool with max size {}",
72-
config.max_connections
71+
"creating connection pool for node {} with max size {}",
72+
config.uri, config.max_connections
7373
);
7474
Ok(ConnectionPool::builder(mgr)
7575
.max_size(config.max_connections)

lib/src/query.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ impl Query {
117117
query,
118118
operation,
119119
fetch_size,
120+
db,
120121
}
121122
}
122123

@@ -233,6 +234,7 @@ pub(crate) struct RetryableQuery<'a> {
233234
query: Query,
234235
operation: Operation,
235236
fetch_size: Option<usize>,
237+
db: Option<Database>,
236238
}
237239

238240
impl<'a> RetryableQuery<'a> {
@@ -270,7 +272,10 @@ impl<'a> RetryableQuery<'a> {
270272

271273
async fn connect(&self) -> QueryResult<ManagedConnection> {
272274
// an error when retrieving a connection is considered permanent
273-
self.pool.get(Some(self.operation)).await.map_err(Retry::No)
275+
self.pool
276+
.get(Some(self.operation), self.db.clone())
277+
.await
278+
.map_err(Retry::No)
274279
}
275280
}
276281

0 commit comments

Comments
 (0)