Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rand = "0.9"
rcgen = "0.14"
ring = "0.17"
rustc-hash = "2"
rustls = { version = "0.23.5", default-features = false, features = ["std"] }
rustls = { version = "0.23.31", default-features = false, features = ["std"] }
rustls-pemfile = "2"
rustls-platform-verifier = "0.6"
rustls-pki-types = "1.7"
Expand Down
4 changes: 4 additions & 0 deletions perf/src/noprotection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl crypto::Session for NoProtectionSession {
) -> Result<(), crypto::ExportKeyingMaterialError> {
self.inner.export_keying_material(output, label, context)
}

fn resumption_tickets_received(&self) -> Option<u32> {
self.inner.resumption_tickets_received()
}
}

impl crypto::ClientConfig for NoProtectionClientConfig {
Expand Down
16 changes: 16 additions & 0 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ pub struct Connection {
stats: ConnectionStats,
/// QUIC version used for the connection.
version: u32,
/// True if we emitted the event for received resumption tickets
resumption_tickets_received: bool,
}

impl Connection {
Expand Down Expand Up @@ -355,6 +357,7 @@ impl Connection {
rng,
stats: ConnectionStats::default(),
version,
resumption_tickets_received: false,
};
if path_validated {
this.on_path_validated();
Expand Down Expand Up @@ -2113,6 +2116,12 @@ impl Connection {
}
}

if !self.resumption_tickets_received && self.crypto.resumption_tickets_received() > Some(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the number of tickets received guaranteed to change at most once in the lifetime of a connection? If not, should we raise an event every time it changes?

{
self.resumption_tickets_received = true;
self.events.push_back(Event::ResumptionEnabled)
}

Ok(())
}

Expand Down Expand Up @@ -4000,6 +4009,13 @@ pub enum Event {
DatagramReceived,
/// One or more application datagrams have been sent after blocking
DatagramsUnblocked,
/// Resumption of the cryptographic session is now possible.
///
/// When using the rustls TLS session provider, this event is emitted when one or more
/// TLS session resumption tickets have been received.
///
/// It is only emitted on the client, and is emitted at most once per connection.
ResumptionEnabled,
}

fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
Expand Down
5 changes: 5 additions & 0 deletions quinn-proto/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub trait Session: Send + Sync + 'static {
label: &[u8],
context: &[u8],
) -> Result<(), ExportKeyingMaterialError>;

/// Returns the number of TLS1.3 session resumption tickets that were received
///
/// Returns `None` on the server side.
fn resumption_tickets_received(&self) -> Option<u32>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing a default impl returning None would reduce churn for this niche use case.

}

/// A pair of keys for bidirectional communication
Expand Down
7 changes: 7 additions & 0 deletions quinn-proto/src/crypto/rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ impl crypto::Session for TlsSession {
.map_err(|_| ExportKeyingMaterialError)?;
Ok(())
}

fn resumption_tickets_received(&self) -> Option<u32> {
match &self.inner {
Connection::Client(conn) => Some(conn.tls13_tickets_received()),
Connection::Server(_) => None,
}
}
}

const RETRY_INTEGRITY_KEY_DRAFT: [u8; 16] = [
Expand Down
52 changes: 52 additions & 0 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ fn lifecycle() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert!(pair.client_conn_mut(client_ch).using_ecn());
assert!(pair.server_conn_mut(server_ch).using_ecn());
Expand Down Expand Up @@ -161,6 +165,10 @@ fn draft_version_compat() {
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect_with(client_config);

assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert!(pair.client_conn_mut(client_ch).using_ecn());
assert!(pair.server_conn_mut(server_ch).using_ecn());
Expand Down Expand Up @@ -206,6 +214,10 @@ fn server_stateless_reset() {
pair.client.connections.get_mut(&client_ch).unwrap().ping();
info!("resetting");
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost {
Expand Down Expand Up @@ -327,6 +339,10 @@ fn finish_stream_simple() {
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();

assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
Expand Down Expand Up @@ -379,6 +395,10 @@ fn reset_stream() {
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Err(ReadError::Reset(ERROR)));
let _ = chunks.finalize();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
}

Expand Down Expand Up @@ -597,6 +617,10 @@ fn zero_rtt_happypath() {
);
let _ = chunks.finalize();
assert_eq!(pair.client_conn_mut(client_ch).stats().path.lost_packets, 0);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
}

#[test]
Expand Down Expand Up @@ -905,6 +929,10 @@ fn stream_id_limit() {
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
Expand Down Expand Up @@ -1192,6 +1220,10 @@ fn idle_timeout() {
}

assert!(pair.time - start < Duration::from_millis(2 * IDLE_TIMEOUT));
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost {
Expand Down Expand Up @@ -1271,6 +1303,10 @@ fn migration() {
assert_ne!(pair.server_conn_mut(server_ch).total_recvd(), 0);

pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(
pair.server_conn_mut(server_ch).remote_address(),
Expand Down Expand Up @@ -1657,6 +1693,10 @@ fn finish_stream_flow_control_reordered() {
pair.server.finish_delay(); // Add flow control packets after
pair.drive();

assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
Expand Down Expand Up @@ -1749,6 +1789,10 @@ fn stop_during_finish() {
pair.drive_server();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive_client();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Stopped { id, error_code: ERROR })) if id == s
Expand Down Expand Up @@ -2036,6 +2080,10 @@ fn finish_acked() {
// Send FIN, receive data ack
info!("client receives ACK, sends FIN");
pair.drive_client();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
// Check for premature finish from data ack
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
// Process FIN ack
Expand Down Expand Up @@ -2074,6 +2122,10 @@ fn finish_retransmit() {
// Receive FIN ack, but no data ack
pair.drive_client();
// Check for premature finish from FIN ack
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ResumptionEnabled)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
// Recover
pair.drive();
Expand Down
52 changes: 52 additions & 0 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,37 @@ impl Connection {
// May need to send MAX_STREAMS to make progress
conn.wake();
}

/// Waits until the connection received TLS resumption tickets
///
/// Yields `true` once resumption tickets were received. Resolves immediately
/// if tickets were already received, otherwise it resolves once tickets arrive.
/// If the server does not send any tickets, the returned future will remain pending forever.
///
/// This should only be used on the client side. On the server side, it will
/// always resolve immediately and yield `false`.
pub fn resumption_tickets_received(&self) -> impl Future<Output = bool> + Send + 'static {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is anyone going to ever inspect this bool? Seems redundant to checking side().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the user meant to actually get the ticket? If the expectation is that they'll still be going through rustls's ClientSessionStore, would it be simpler to bypass quinn entirely and use a ClientSessionStore that notifies you directly on insert?

let conn = self.0.clone();
async move {
let notify;
let (mut notified, out) = {
let conn = conn.state.lock("resumption_tickets_received");
let (notified, out) = match conn.resumption_tickets.as_ref() {
Some(ResumptionTicketState::Received) => (None, true),
Some(ResumptionTicketState::Pending(n)) => {
notify = n.clone();
(Some(notify.notified()), true)
}
None => (None, false),
};
(notified, out)
};
if let Some(notified) = notified.take() {
notified.await;
}
out
}
}
}

pin_project! {
Expand Down Expand Up @@ -885,6 +916,10 @@ impl ConnectionRef {
socket: Arc<dyn AsyncUdpSocket>,
runtime: Arc<dyn Runtime>,
) -> Self {
let resumption_tickets = match conn.side() {
Side::Client => Some(ResumptionTicketState::Pending(Default::default())),
Side::Server => None,
};
Self(Arc::new(ConnectionInner {
state: Mutex::new(State {
inner: conn,
Expand All @@ -907,6 +942,7 @@ impl ConnectionRef {
runtime,
send_buffer: Vec::new(),
buffered_transmit: None,
resumption_tickets,
}),
shared: Shared::default(),
}))
Expand Down Expand Up @@ -989,6 +1025,8 @@ pub(crate) struct State {
send_buffer: Vec<u8>,
/// We buffer a transmit when the underlying I/O would block
buffered_transmit: Option<proto::Transmit>,
/// Whether we received resumption tickets. None on the server side.
resumption_tickets: Option<ResumptionTicketState>,
}

impl State {
Expand Down Expand Up @@ -1123,6 +1161,14 @@ impl State {
wake_all_notify(&mut self.stopped);
}
}
ResumptionEnabled => {
if let Some(ResumptionTicketState::Pending(notify)) =
self.resumption_tickets.as_mut()
{
notify.notify_waiters();
self.resumption_tickets = Some(ResumptionTicketState::Received);
}
}
ConnectionLost { reason } => {
self.terminate(reason, shared);
}
Expand Down Expand Up @@ -1293,6 +1339,12 @@ fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
.for_each(|(_, notify)| notify.notify_waiters())
}

#[derive(Debug)]
enum ResumptionTicketState {
Received,
Pending(Arc<Notify>),
}

/// Errors that can arise when sending a datagram
#[derive(Debug, Error, Clone, Eq, PartialEq)]
pub enum SendDatagramError {
Expand Down
65 changes: 65 additions & 0 deletions quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,71 @@ async fn zero_rtt() {
endpoint.wait_idle().await;
}

#[tokio::test]
async fn zero_rtt_resumption() {
let _guard = subscribe();
let endpoint = endpoint();

let endpoint2 = endpoint.clone();
tokio::spawn(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to join this at the end of the test to ensure any panics are surfaced?

for _ in 0..12 {
let incoming = endpoint2.accept().await.unwrap().accept().unwrap();
let (connection, _established) =
incoming.into_0rtt().unwrap_or_else(|_| unreachable!());
connection.closed().await;
}
});

let connect_0rtt = || {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost")
.unwrap()
.into_0rtt()
.unwrap_or_else(|_| panic!("missing 0-RTT keys"))
};

let connect_0rtt_fail = || {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost")
.unwrap()
.into_0rtt()
.err()
.expect("0-RTT succeeded without keys")
};

let connect_full = || async {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost")
.unwrap()
.into_0rtt()
.err()
.expect("0-RTT succeeded without keys")
.await
.expect("connect")
};

// 0rtt without full connection should fail
connect_0rtt_fail();
// now do a full connection
connect_full().await;
// we received two tickets, so we should be able to resume two times, and then fail
connect_0rtt();
connect_0rtt();
connect_0rtt_fail();

// now do another full connection
connect_full().await;
connect_0rtt();
// this time we wait to receive resumption tickets on the zero-rtt connection
let (conn, _0rtt_accepted) = connect_0rtt();
conn.resumption_tickets_received().await;
// and we can do two more 0rtt conns
connect_0rtt();
connect_0rtt();
// and then fail again
connect_0rtt_fail();
}

#[test]
#[cfg_attr(
any(target_os = "solaris", target_os = "illumos"),
Expand Down
Loading