Skip to content

Commit 964b672

Browse files
committed
rewrite PgListener recv and try_recv methods
1 parent 59e3ca5 commit 964b672

File tree

1 file changed

+54
-32
lines changed

1 file changed

+54
-32
lines changed

sqlx-postgres/src/listener.rs

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::fmt::{self, Debug};
2-
use std::io;
32
use std::str::from_utf8;
43

54
use futures_channel::mpsc;
@@ -197,6 +196,30 @@ impl PgListener {
197196
Ok(self.connection.as_mut().unwrap())
198197
}
199198

199+
// same as `connection` but if fails to connect retries 5 times
200+
#[inline]
201+
async fn connection_with_recovery(&mut self) -> Result<&mut PgConnection, Error> {
202+
// retry max 5 times with these backoff durations
203+
let backoff_times = [0, 100, 1000, 2000, 10_000]; // ms
204+
205+
let mut last_err = None;
206+
for backoff_ms in backoff_times {
207+
match self.connect_if_needed().await {
208+
Ok(()) => return Ok(self.connection.as_mut().unwrap()),
209+
Err(err @ Error::Io(_)) => {
210+
last_err = Some(err);
211+
212+
crate::rt::sleep(std::time::Duration::from_millis(backoff_ms)).await;
213+
continue;
214+
},
215+
Err(other) => return Err(other),
216+
}
217+
}
218+
219+
// if 5 retries later still got IO error, return the last one and stop
220+
Err(last_err.unwrap())
221+
}
222+
200223
/// Receives the next notification available from any of the subscribed channels.
201224
///
202225
/// If the connection to PostgreSQL is lost, it is automatically reconnected on the next
@@ -258,62 +281,61 @@ impl PgListener {
258281
///
259282
/// [`eager_reconnect`]: PgListener::eager_reconnect
260283
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error> {
284+
match self.recv_without_recovery().await {
285+
Ok(notification) => return Ok(Some(notification)),
286+
287+
// The connection is dead, ensure that it is dropped,
288+
// update self state, and loop to try again.
289+
Err(Error::Io(_)) => {
290+
if let Some(mut conn) = self.connection.take() {
291+
self.buffer_tx = conn.inner.stream.notifications.take();
292+
// Close the connection in a background task, so we can continue.
293+
conn.close_on_drop();
294+
}
295+
296+
if self.eager_reconnect {
297+
self.connection_with_recovery().await?;
298+
}
299+
300+
// lost connection
301+
return Ok(None);
302+
},
303+
Err(e) => return Err(e),
304+
}
305+
}
306+
307+
async fn recv_without_recovery(&mut self) -> Result<PgNotification, Error> {
261308
// Flush the buffer first, if anything
262309
// This would only fill up if this listener is used as a connection
263310
if let Some(notification) = self.next_buffered() {
264-
return Ok(Some(notification));
311+
return Ok(notification);
265312
}
266313

267314
// Fetch our `CloseEvent` listener, if applicable.
268315
let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event());
269316

270317
loop {
271-
let next_message = dbg!(self.connection().await)?.inner.stream.recv_unchecked();
318+
let next_message = self.connection().await?.inner.stream.recv_unchecked();
272319

273320
let res = if let Some(ref mut close_event) = close_event {
274321
// cancels the wait and returns `Err(PoolClosed)` if the pool is closed
275322
// before `next_message` returns, or if the pool was already closed
276-
dbg!(close_event.do_until(next_message).await)?
323+
close_event.do_until(next_message).await?
277324
} else {
278325
next_message.await
279326
};
280327

281-
let message = match res {
282-
Ok(message) => message,
283-
284-
// The connection is dead, ensure that it is dropped,
285-
// update self state, and loop to try again.
286-
Err(Error::Io(err)) =>
287-
{
288-
if let Some(mut conn) = self.connection.take() {
289-
self.buffer_tx = conn.inner.stream.notifications.take();
290-
// Close the connection in a background task, so we can continue.
291-
conn.close_on_drop();
292-
}
293-
294-
if self.eager_reconnect {
295-
dbg!(self.connect_if_needed().await)?;
296-
}
297-
298-
// lost connection
299-
return Ok(None);
300-
}
301-
302-
// Forward other errors
303-
Err(error) => {
304-
return Err(error);
305-
}
306-
};
328+
let message = res?;
307329

308330
match message.format {
309331
// We've received an async notification, return it.
310332
BackendMessageFormat::NotificationResponse => {
311-
return Ok(Some(PgNotification(message.decode()?)));
333+
return Ok(PgNotification(message.decode()?));
312334
}
313335

314336
// Mark the connection as ready for another query
315337
BackendMessageFormat::ReadyForQuery => {
316-
dbg!(self.connection().await)?.inner.pending_ready_for_query_count -= 1;
338+
self.connection().await?.inner.pending_ready_for_query_count -= 1;
317339
}
318340

319341
// Ignore unexpected messages

0 commit comments

Comments
 (0)