Skip to content

Commit 4a5c0fa

Browse files
committed
fix(rumqttc): remove request_tx from EventLoop to allow EventLoop to
gracefully terminate
1 parent 005f73c commit 4a5c0fa

File tree

5 files changed

+30
-32
lines changed

5 files changed

+30
-32
lines changed

rumqttc/src/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ impl AsyncClient {
4949
///
5050
/// `cap` specifies the capacity of the bounded async channel.
5151
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
52-
let eventloop = EventLoop::new(options, cap);
53-
let request_tx = eventloop.requests_tx.clone();
52+
let (eventloop, request_tx) = EventLoop::new(options, cap);
5453

5554
let client = AsyncClient { request_tx };
5655

rumqttc/src/eventloop.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ pub struct EventLoop {
7676
pub state: MqttState,
7777
/// Request stream
7878
requests_rx: Receiver<Request>,
79-
/// Requests handle to send requests
80-
pub(crate) requests_tx: Sender<Request>,
8179
/// Pending packets from last session
8280
pub pending: VecDeque<Request>,
8381
/// Network connection to the broker
@@ -99,22 +97,24 @@ impl EventLoop {
9997
///
10098
/// When connection encounters critical errors (like auth failure), user has a choice to
10199
/// access and update `options`, `state` and `requests`.
102-
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
100+
pub fn new(mqtt_options: MqttOptions, cap: usize) -> (EventLoop, Sender<Request>) {
103101
let (requests_tx, requests_rx) = bounded(cap);
104102
let pending = VecDeque::new();
105103
let max_inflight = mqtt_options.inflight;
106104
let manual_acks = mqtt_options.manual_acks;
107105

108-
EventLoop {
109-
mqtt_options,
110-
state: MqttState::new(max_inflight, manual_acks),
106+
(
107+
EventLoop {
108+
mqtt_options,
109+
state: MqttState::new(max_inflight, manual_acks),
110+
requests_rx,
111+
pending,
112+
network: None,
113+
keepalive_timeout: None,
114+
network_options: NetworkOptions::new(),
115+
},
111116
requests_tx,
112-
requests_rx,
113-
pending,
114-
network: None,
115-
keepalive_timeout: None,
116-
network_options: NetworkOptions::new(),
117-
}
117+
)
118118
}
119119

120120
/// Last session might contain packets which aren't acked. MQTT says these packets should be

rumqttc/src/v5/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ impl AsyncClient {
5454
///
5555
/// `cap` specifies the capacity of the bounded async channel.
5656
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
57-
let eventloop = EventLoop::new(options, cap);
58-
let request_tx = eventloop.requests_tx.clone();
57+
let (eventloop, request_tx) = EventLoop::new(options, cap);
5958

6059
let client = AsyncClient { request_tx };
6160

rumqttc/src/v5/eventloop.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ pub struct EventLoop {
7474
pub state: MqttState,
7575
/// Request stream
7676
requests_rx: Receiver<Request>,
77-
/// Requests handle to send requests
78-
pub(crate) requests_tx: Sender<Request>,
7977
/// Pending packets from last session
8078
pub pending: VecDeque<Request>,
8179
/// Network connection to the broker
@@ -96,21 +94,23 @@ impl EventLoop {
9694
///
9795
/// When connection encounters critical errors (like auth failure), user has a choice to
9896
/// access and update `options`, `state` and `requests`.
99-
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
97+
pub fn new(options: MqttOptions, cap: usize) -> (EventLoop, Sender<Request>) {
10098
let (requests_tx, requests_rx) = bounded(cap);
10199
let pending = VecDeque::new();
102100
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
103101
let manual_acks = options.manual_acks;
104102

105-
EventLoop {
106-
options,
107-
state: MqttState::new(inflight_limit, manual_acks),
103+
(
104+
EventLoop {
105+
options,
106+
state: MqttState::new(inflight_limit, manual_acks),
107+
requests_rx,
108+
pending,
109+
network: None,
110+
keepalive_timeout: None,
111+
},
108112
requests_tx,
109-
requests_rx,
110-
pending,
111-
network: None,
112-
keepalive_timeout: None,
113-
}
113+
)
114114
}
115115

116116
/// Last session might contain packets which aren't acked. MQTT says these packets should be

rumqttc/tests/reliability.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn connection_should_timeout_on_time() {
7878

7979
time::sleep(Duration::from_secs(1)).await;
8080
let options = MqttOptions::new("dummy", "127.0.0.1", 1880);
81-
let mut eventloop = EventLoop::new(options, 5);
81+
let (mut eventloop, _sender) = EventLoop::new(options, 5);
8282

8383
let start = Instant::now();
8484
let o = eventloop.poll().await;
@@ -121,7 +121,7 @@ async fn idle_connection_triggers_pings_on_time() {
121121

122122
// Create client eventloop and poll
123123
task::spawn(async move {
124-
let mut eventloop = EventLoop::new(options, 5);
124+
let (mut eventloop, _sender) = EventLoop::new(options, 5);
125125
run(&mut eventloop, false).await.unwrap();
126126
});
127127

@@ -200,7 +200,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() {
200200
options.set_keep_alive(Duration::from_secs(keep_alive));
201201

202202
task::spawn(async move {
203-
let mut eventloop = EventLoop::new(options, 5);
203+
let (mut eventloop, _sender) = EventLoop::new(options, 5);
204204
run(&mut eventloop, false).await.unwrap();
205205
});
206206

@@ -244,7 +244,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() {
244244

245245
time::sleep(Duration::from_secs(1)).await;
246246
let start = Instant::now();
247-
let mut eventloop = EventLoop::new(options, 5);
247+
let (mut eventloop, _sender) = EventLoop::new(options, 5);
248248
loop {
249249
if let Err(e) = eventloop.poll().await {
250250
match e {
@@ -455,7 +455,7 @@ async fn next_poll_after_connect_failure_reconnects() {
455455
});
456456

457457
time::sleep(Duration::from_secs(1)).await;
458-
let mut eventloop = EventLoop::new(options, 5);
458+
let (mut eventloop, _sender) = EventLoop::new(options, 5);
459459

460460
match eventloop.poll().await {
461461
Err(ConnectionError::ConnectionRefused(ConnectReturnCode::BadUserNamePassword)) => (),

0 commit comments

Comments
 (0)