Skip to content

Commit a025007

Browse files
authored
Replace unmaintained crates (#248)
* Replace backoff with backon * Update min lockfile * Update MSRV lockfile * Replace paste with pastey * Update lockfiles * Rename Retry constructors * Remove clone from a copy type * Replace deprecated method * Expose is_read only behind the new feature * Inline variables into format strings
1 parent 2fe8578 commit a025007

24 files changed

+1157
-753
lines changed

ci/Cargo.lock.min

Lines changed: 467 additions & 316 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/Cargo.lock.msrv

Lines changed: 443 additions & 292 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ unstable-bolt-protocol-impl-v2 = [
2828
]
2929

3030
[dependencies]
31-
backoff = { version = "0.4.0", features = ["tokio"] }
31+
backon = { version = "1.5.1", default-features = false, features = [
32+
"tokio-sleep",
33+
] }
3234
bytes = { version = "1.5.0", features = ["serde"] }
3335
chrono-tz = "0.10.0"
3436
dashmap = "6.1.0"
@@ -37,7 +39,7 @@ futures = { version = "0.3.0" }
3739
log = "0.4.0"
3840
nav-types = { version = "0.5.2", optional = true }
3941
neo4rs-macros = { version = "0.3.0", path = "../macros" }
40-
paste = "1.0.0"
42+
pastey = "0.1.0"
4143
pin-project-lite = "0.2.9"
4244
rustls = { version = "0.23.16", features = ["tls12", "ring"] }
4345
rustls-native-certs = "0.7.1"

lib/src/bolt/structs/datetime.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::str::FromStr;
22

3+
use chrono::Offset;
34
use serde::de::{Deserialize, Deserializer};
45

56
use super::de::{impl_visitor, impl_visitor_ref};
@@ -70,9 +71,11 @@ impl<'de> DateTimeZoneIdRef<'de> {
7071
/// If the value could not be parsed or is unknown, None is returned.
7172
pub fn timezone_offset_seconds(&self) -> Option<i32> {
7273
let tz = chrono_tz::Tz::from_str(self.tz_id).ok()?;
73-
let offset =
74-
chrono::TimeZone::offset_from_utc_datetime(&tz, &chrono::NaiveDateTime::UNIX_EPOCH);
75-
let offset = chrono::Offset::fix(&offset);
74+
let offset = chrono::DateTime::UNIX_EPOCH
75+
.with_timezone(&tz)
76+
.fixed_offset()
77+
.offset()
78+
.fix();
7679
Some(offset.local_minus_utc())
7780
}
7881

@@ -191,9 +194,11 @@ impl<'de> LegacyDateTimeZoneIdRef<'de> {
191194
/// If the value could not be parsed or is unknown, None is returned.
192195
pub fn timezone_offset_seconds(&self) -> Option<i32> {
193196
let tz = chrono_tz::Tz::from_str(self.tz_id).ok()?;
194-
let offset =
195-
chrono::TimeZone::offset_from_utc_datetime(&tz, &chrono::NaiveDateTime::UNIX_EPOCH);
196-
let offset = chrono::Offset::fix(&offset);
197+
let offset = chrono::DateTime::UNIX_EPOCH
198+
.with_timezone(&tz)
199+
.fixed_offset()
200+
.offset()
201+
.fix();
197202
Some(offset.local_minus_utc())
198203
}
199204

@@ -320,9 +325,11 @@ impl DateTimeZoneId {
320325
/// If the value could not be parsed or is unknown, None is returned.
321326
pub fn timezone_offset_seconds(&self) -> Option<i32> {
322327
let tz = chrono_tz::Tz::from_str(&self.tz_id).ok()?;
323-
let offset =
324-
chrono::TimeZone::offset_from_utc_datetime(&tz, &chrono::NaiveDateTime::UNIX_EPOCH);
325-
let offset = chrono::Offset::fix(&offset);
328+
let offset = chrono::DateTime::UNIX_EPOCH
329+
.with_timezone(&tz)
330+
.fixed_offset()
331+
.offset()
332+
.fix();
326333
Some(offset.local_minus_utc())
327334
}
328335

@@ -368,9 +375,11 @@ impl LegacyDateTimeZoneId {
368375
/// If the value could not be parsed or is unknown, None is returned.
369376
pub fn timezone_offset_seconds(&self) -> Option<i32> {
370377
let tz = chrono_tz::Tz::from_str(&self.tz_id).ok()?;
371-
let offset =
372-
chrono::TimeZone::offset_from_utc_datetime(&tz, &chrono::NaiveDateTime::UNIX_EPOCH);
373-
let offset = chrono::Offset::fix(&offset);
378+
let offset = chrono::DateTime::UNIX_EPOCH
379+
.with_timezone(&tz)
380+
.fixed_offset()
381+
.offset()
382+
.fix();
374383
Some(offset.local_minus_utc())
375384
}
376385

lib/src/connection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl Connection {
9090
let mut response = [0, 0, 0, 0];
9191
stream.read_exact(&mut response).await?;
9292
let version = Version::parse(response)?;
93-
info!("Connected to Neo4j with version {}", version);
93+
info!("Connected to Neo4j with version {version}");
9494
Ok(version)
9595
}
9696

@@ -293,10 +293,10 @@ impl Display for Routing {
293293
Routing::Yes(routing) => {
294294
let routing = routing
295295
.iter()
296-
.map(|(k, v)| format!("{}: \"{}\"", k, v))
296+
.map(|(k, v)| format!("{k}: \"{v}\""))
297297
.collect::<Vec<_>>()
298298
.join(", ");
299-
write!(f, "{}", routing)
299+
write!(f, "{routing}")
300300
}
301301
}
302302
}
@@ -503,7 +503,7 @@ impl NeoUrl {
503503
Ok(url) if url.has_host() => url,
504504
// missing scheme
505505
Ok(_) | Err(url::ParseError::RelativeUrlWithoutBase) => {
506-
Url::parse(&format!("bolt://{}", uri))?
506+
Url::parse(&format!("bolt://{uri}"))?
507507
}
508508
Err(err) => return Err(Error::UrlParseError(err)),
509509
};

lib/src/graph.rs

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use {
1010

1111
use crate::graph::ConnectionPoolManager::Direct;
1212
use crate::pool::ManagedConnection;
13+
use crate::query::RetryableQuery;
14+
use crate::retry::Retry;
1315
use crate::RunResult;
1416
use crate::{
1517
config::{Config, ConfigBuilder, Database, LiveConfig},
@@ -20,27 +22,27 @@ use crate::{
2022
txn::Txn,
2123
Operation,
2224
};
23-
use backoff::{Error, ExponentialBackoff};
25+
use backon::{ExponentialBuilder, RetryableWithContext};
2426
use std::time::Duration;
2527

2628
#[derive(Clone)]
27-
enum ConnectionPoolManager {
29+
pub(crate) enum ConnectionPoolManager {
2830
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
2931
Routed(RoutedConnectionManager),
3032
Direct(ConnectionPool),
3133
}
3234

3335
impl ConnectionPoolManager {
3436
#[allow(unused_variables)]
35-
async fn get(&self, operation: Option<Operation>) -> Result<ManagedConnection> {
37+
pub(crate) async fn get(&self, operation: Option<Operation>) -> Result<ManagedConnection> {
3638
match self {
3739
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
3840
Routed(manager) => manager.get(operation).await,
3941
Direct(pool) => pool.get().await.map_err(crate::Error::from),
4042
}
4143
}
4244

43-
fn backoff(&self) -> ExponentialBackoff {
45+
fn backoff(&self) -> ExponentialBuilder {
4446
match self {
4547
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
4648
Routed(manager) => manager.backoff(),
@@ -164,7 +166,7 @@ impl Graph {
164166
operation: Operation,
165167
bookmarks: &[String],
166168
) -> Result<Txn> {
167-
let connection = self.pool.get(Some(operation.clone())).await?;
169+
let connection = self.pool.get(Some(operation)).await?;
168170
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
169171
{
170172
Txn::new(db, self.config.fetch_size, connection, operation, bookmarks).await
@@ -222,29 +224,18 @@ impl Graph {
222224
async fn impl_run_on(
223225
&self,
224226
db: Option<Database>,
225-
q: Query,
227+
query: Query,
226228
operation: Operation,
227229
) -> Result<RunResult> {
228-
let is_read = operation.is_read();
229-
let result = backoff::future::retry_notify(
230-
self.pool.backoff(),
231-
|| {
232-
let pool = &self.pool;
233-
let mut query = q.clone();
234-
let operation = operation.clone();
235-
if let Some(db) = db.as_deref() {
236-
query = query.extra("db", db);
237-
}
238-
query = query.extra("mode", if is_read { "r" } else { "w" });
239-
async move {
240-
let mut connection =
241-
pool.get(Some(operation)).await.map_err(Error::Permanent)?; // an error when retrieving a connection is considered permanent
242-
query.run_retryable(&mut connection).await
243-
}
244-
},
245-
Self::log_retry,
246-
)
247-
.await;
230+
let query = query.into_retryable(db, operation, &self.pool, None);
231+
232+
let (query, result) = RetryableQuery::retry_run
233+
.retry(self.pool.backoff())
234+
.sleep(tokio::time::sleep)
235+
.context(query)
236+
.when(|e| matches!(e, Retry::Yes(_)))
237+
.notify(Self::log_retry)
238+
.await;
248239

249240
match result {
250241
Ok(result) => {
@@ -257,7 +248,7 @@ impl Graph {
257248
}
258249
Direct(_) => {}
259250
}
260-
} else if is_read {
251+
} else if query.is_read() {
261252
match &self.pool {
262253
Routed(routed) => {
263254
debug!("No bookmark received after a read operation, discarding all bookmarks");
@@ -269,7 +260,7 @@ impl Graph {
269260
}
270261
Ok(result)
271262
}
272-
Err(e) => Err(e),
263+
Err(e) => Err(e.into_inner()),
273264
}
274265
}
275266

@@ -331,32 +322,23 @@ impl Graph {
331322
async fn impl_execute_on(
332323
&self,
333324
db: Option<Database>,
334-
q: Query,
325+
query: Query,
335326
operation: Operation,
336327
) -> Result<DetachedRowStream> {
337-
backoff::future::retry_notify(
338-
self.pool.backoff(),
339-
|| {
340-
let pool = &self.pool;
341-
let mut query = q.clone();
342-
let operation = operation.clone();
343-
let fetch_size = self.config.fetch_size;
344-
if let Some(db) = db.as_deref() {
345-
query = query.extra("db", db);
346-
}
347-
let operation = operation.clone();
348-
query = query.param("mode", if operation.is_read() { "r" } else { "w" });
349-
async move {
350-
let connection = pool.get(Some(operation)).await.map_err(Error::Permanent)?; // an error when retrieving a connection is considered permanent
351-
query.execute_retryable(fetch_size, connection).await
352-
}
353-
},
354-
Self::log_retry,
355-
)
356-
.await
328+
let query = query.into_retryable(db, operation, &self.pool, Some(self.config.fetch_size));
329+
330+
let (query, result) = RetryableQuery::retry_execute
331+
.retry(self.pool.backoff())
332+
.sleep(tokio::time::sleep)
333+
.context(query)
334+
.when(|e| matches!(e, Retry::Yes(_)))
335+
.notify(Self::log_retry)
336+
.await;
337+
338+
result.map_err(Retry::into_inner)
357339
}
358340

359-
fn log_retry(e: crate::Error, delay: Duration) {
341+
fn log_retry(e: &Retry<crate::Error>, delay: Duration) {
360342
let level = match delay.as_millis() {
361343
0..=499 => log::Level::Debug,
362344
500..=4999 => log::Level::Info,

lib/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ mod messages;
497497
mod packstream;
498498
mod pool;
499499
mod query;
500+
mod retry;
500501
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
501502
mod routing;
502503
mod row;
@@ -530,7 +531,7 @@ pub use crate::version::Version;
530531
pub(crate) use messages::Success;
531532
use std::fmt::Display;
532533

533-
#[derive(Debug, PartialEq, Clone)]
534+
#[derive(Debug, PartialEq, Clone, Copy)]
534535
pub enum Operation {
535536
Read,
536537
Write,

lib/src/messages.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl BoltResponse {
237237
"unknown message ".to_owned(),
238238
|mut output, byte| {
239239
use std::fmt::Write;
240-
let _ = write!(output, "{:02X}", byte);
240+
let _ = write!(output, "{byte:02X}");
241241
output
242242
},
243243
)))
@@ -247,7 +247,7 @@ impl BoltResponse {
247247
match self {
248248
BoltResponse::Failure(failure) => Error::Neo4j(failure.into_error()),
249249
BoltResponse::Ignore(ignore) => Error::Neo4j(ignore.into_error()),
250-
_ => Error::UnexpectedMessage(format!("unexpected response for {}: {:?}", msg, self)),
250+
_ => Error::UnexpectedMessage(format!("unexpected response for {msg}: {self:?}")),
251251
}
252252
}
253253
}

lib/src/pool.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
connection::{Connection, ConnectionInfo},
77
errors::{Error, Result},
88
};
9-
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
9+
use backon::ExponentialBuilder;
1010
use deadpool::managed::{Manager, Metrics, Object, Pool, RecycleResult};
1111
use log::{info, trace};
1212

@@ -15,7 +15,7 @@ pub type ManagedConnection = Object<ConnectionManager>;
1515

1616
pub struct ConnectionManager {
1717
info: ConnectionInfo,
18-
backoff: ExponentialBackoff,
18+
backoff: ExponentialBuilder,
1919
}
2020

2121
impl ConnectionManager {
@@ -26,20 +26,25 @@ impl ConnectionManager {
2626
tls_config: &ConnectionTLSConfig,
2727
) -> Result<Self> {
2828
let info = ConnectionInfo::new(uri, user, password, tls_config)?;
29-
let backoff = ExponentialBackoffBuilder::new()
30-
.with_initial_interval(Duration::from_millis(1))
31-
.with_randomization_factor(0.42)
32-
.with_multiplier(2.0)
33-
.with_max_elapsed_time(Some(Duration::from_secs(60)))
34-
.build();
29+
let backoff = backoff();
3530
Ok(ConnectionManager { info, backoff })
3631
}
3732

38-
pub fn backoff(&self) -> ExponentialBackoff {
39-
self.backoff.clone()
33+
pub fn backoff(&self) -> ExponentialBuilder {
34+
self.backoff
4035
}
4136
}
4237

38+
pub(crate) fn backoff() -> ExponentialBuilder {
39+
ExponentialBuilder::new()
40+
.with_jitter()
41+
.with_factor(2.0)
42+
.without_max_times()
43+
.with_min_delay(Duration::from_millis(1))
44+
.with_max_delay(Duration::from_secs(10))
45+
.with_total_delay(Some(Duration::from_secs(60)))
46+
}
47+
4348
impl Manager for ConnectionManager {
4449
type Type = Connection;
4550
type Error = Error;

0 commit comments

Comments
 (0)