Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions sqlx-postgres/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt::{self, Debug};
use std::io;
use std::str::from_utf8;

use futures_channel::mpsc;
Expand Down Expand Up @@ -197,6 +196,30 @@ impl PgListener {
Ok(self.connection.as_mut().unwrap())
}

// same as `connection` but if fails to connect retries 5 times
#[inline]
async fn connection_with_recovery(&mut self) -> Result<&mut PgConnection, Error> {
// retry max 5 times with these backoff durations
let backoff_times = [0, 100, 1000, 2000, 10_000]; // ms

let mut last_err = None;
for backoff_ms in backoff_times {
match self.connect_if_needed().await {
Ok(()) => return Ok(self.connection.as_mut().unwrap()),
Err(err @ Error::Io(_)) => {
last_err = Some(err);

crate::rt::sleep(std::time::Duration::from_millis(backoff_ms)).await;
continue;
}
Err(other) => return Err(other),
}
}

// if 5 retries later still got IO error, return the last one and stop
Err(last_err.unwrap())
}

/// Receives the next notification available from any of the subscribed channels.
///
/// If the connection to PostgreSQL is lost, it is automatically reconnected on the next
Expand Down Expand Up @@ -258,10 +281,34 @@ impl PgListener {
///
/// [`eager_reconnect`]: PgListener::eager_reconnect
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error> {
match self.recv_without_recovery().await {
Ok(notification) => Ok(Some(notification)),

// The connection is dead, ensure that it is dropped,
// update self state, and loop to try again.
Err(Error::Io(_)) => {
if let Some(mut conn) = self.connection.take() {
self.buffer_tx = conn.inner.stream.notifications.take();
// Close the connection in a background task, so we can continue.
conn.close_on_drop();
}

if self.eager_reconnect {
self.connection_with_recovery().await?;
}

// lost connection
Ok(None)
}
Err(e) => Err(e),
}
}

async fn recv_without_recovery(&mut self) -> Result<PgNotification, Error> {
// Flush the buffer first, if anything
// This would only fill up if this listener is used as a connection
if let Some(notification) = self.next_buffered() {
return Ok(Some(notification));
return Ok(notification);
}

// Fetch our `CloseEvent` listener, if applicable.
Expand All @@ -278,45 +325,12 @@ impl PgListener {
next_message.await
};

let message = match res {
Ok(message) => message,

// The connection is dead, ensure that it is dropped,
// update self state, and loop to try again.
Err(Error::Io(err))
if matches!(
err.kind(),
io::ErrorKind::ConnectionAborted |
io::ErrorKind::UnexpectedEof |
// see ERRORS section in tcp(7) man page (https://man7.org/linux/man-pages/man7/tcp.7.html)
io::ErrorKind::TimedOut |
io::ErrorKind::BrokenPipe
) =>
{
if let Some(mut conn) = self.connection.take() {
self.buffer_tx = conn.inner.stream.notifications.take();
// Close the connection in a background task, so we can continue.
conn.close_on_drop();
}

if self.eager_reconnect {
self.connect_if_needed().await?;
}

// lost connection
return Ok(None);
}

// Forward other errors
Err(error) => {
return Err(error);
}
};
let message = res?;

match message.format {
// We've received an async notification, return it.
BackendMessageFormat::NotificationResponse => {
return Ok(Some(PgNotification(message.decode()?)));
return Ok(PgNotification(message.decode()?));
}

// Mark the connection as ready for another query
Expand Down
Loading