Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Replace `Vec<Option<u16>>` with `FixedBitSet` for managing packet ids of released QoS 2 publishes and incoming QoS 2 publishes in `MqttState`.
* Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`.
* Update `thiserror` to `2.0.8`, `tokio-rustls` to `0.26.0`, `rustls-webpki` to `0.102.8`, `rustls-pemfile` to `2.2.0`, `rustls-native-certs` to `0.8.1`, `async-tungstenite` to `0.28.0`, `ws_stream_tungstenite` to `0.14.0`, `native-tls` to `0.2.12` and `tokio-stream` to `0.1.16`.
* Make error types returned by `rumqttc::v5::Connection` public
* Make error types returned by `rumqttc::v5::Connection` public.
* Remove `request_tx` from `EventLoop` to allow gracefully exit.

### Deprecated

Expand Down
3 changes: 1 addition & 2 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ impl AsyncClient {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let (eventloop, request_tx) = EventLoop::new(options, cap);

let client = AsyncClient { request_tx };

Expand Down
24 changes: 12 additions & 12 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ pub struct EventLoop {
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Network connection to the broker
Expand All @@ -99,22 +97,24 @@ impl EventLoop {
///
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
pub fn new(mqtt_options: MqttOptions, cap: usize) -> (EventLoop, Sender<Request>) {
let (requests_tx, requests_rx) = bounded(cap);
let pending = VecDeque::new();
let max_inflight = mqtt_options.inflight;
let manual_acks = mqtt_options.manual_acks;

EventLoop {
mqtt_options,
state: MqttState::new(max_inflight, manual_acks),
(
EventLoop {
mqtt_options,
state: MqttState::new(max_inflight, manual_acks),
requests_rx,
pending,
network: None,
keepalive_timeout: None,
network_options: NetworkOptions::new(),
},
requests_tx,
requests_rx,
pending,
network: None,
keepalive_timeout: None,
network_options: NetworkOptions::new(),
}
)
}

/// Last session might contain packets which aren't acked. MQTT says these packets should be
Expand Down
3 changes: 1 addition & 2 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ impl AsyncClient {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let (eventloop, request_tx) = EventLoop::new(options, cap);

let client = AsyncClient { request_tx };

Expand Down
22 changes: 11 additions & 11 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ pub struct EventLoop {
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Network connection to the broker
Expand All @@ -96,21 +94,23 @@ impl EventLoop {
///
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
pub fn new(options: MqttOptions, cap: usize) -> (EventLoop, Sender<Request>) {
let (requests_tx, requests_rx) = bounded(cap);
let pending = VecDeque::new();
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;

EventLoop {
options,
state: MqttState::new(inflight_limit, manual_acks),
(
EventLoop {
options,
state: MqttState::new(inflight_limit, manual_acks),
requests_rx,
pending,
network: None,
keepalive_timeout: None,
},
requests_tx,
requests_rx,
pending,
network: None,
keepalive_timeout: None,
}
)
}

/// Last session might contain packets which aren't acked. MQTT says these packets should be
Expand Down
10 changes: 5 additions & 5 deletions rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn connection_should_timeout_on_time() {

time::sleep(Duration::from_secs(1)).await;
let options = MqttOptions::new("dummy", "127.0.0.1", 1880);
let mut eventloop = EventLoop::new(options, 5);
let (mut eventloop, _sender) = EventLoop::new(options, 5);

let start = Instant::now();
let o = eventloop.poll().await;
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn idle_connection_triggers_pings_on_time() {

// Create client eventloop and poll
task::spawn(async move {
let mut eventloop = EventLoop::new(options, 5);
let (mut eventloop, _sender) = EventLoop::new(options, 5);
run(&mut eventloop, false).await.unwrap();
});

Expand Down Expand Up @@ -200,7 +200,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() {
options.set_keep_alive(Duration::from_secs(keep_alive));

task::spawn(async move {
let mut eventloop = EventLoop::new(options, 5);
let (mut eventloop, _sender) = EventLoop::new(options, 5);
run(&mut eventloop, false).await.unwrap();
});

Expand Down Expand Up @@ -244,7 +244,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() {

time::sleep(Duration::from_secs(1)).await;
let start = Instant::now();
let mut eventloop = EventLoop::new(options, 5);
let (mut eventloop, _sender) = EventLoop::new(options, 5);
loop {
if let Err(e) = eventloop.poll().await {
match e {
Expand Down Expand Up @@ -455,7 +455,7 @@ async fn next_poll_after_connect_failure_reconnects() {
});

time::sleep(Duration::from_secs(1)).await;
let mut eventloop = EventLoop::new(options, 5);
let (mut eventloop, _sender) = EventLoop::new(options, 5);

match eventloop.poll().await {
Err(ConnectionError::ConnectionRefused(ConnectReturnCode::BadUserNamePassword)) => (),
Expand Down