diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 1045cfcf1..feb3fa240 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -12,17 +12,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `size()` method on `Packet` calculates size once serialized. * `read()` and `write()` methods on `Packet`. * `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection +* `connection_timeout()` and `set_connection_timeout()` method on (non-v5) `MqttOptions`. +* `network_options()` method on (non-v5) `MqttOptions` on to instead `EventLoop.network_options()`. +* `set_network_options()` method on (non-v5) `MqttOptions` on to instead `EventLoop.set_network_options()`. ### Changed * rename `N` as `AsyncReadWrite` to describe usage. * use `Framed` to encode/decode MQTT packets. -* use `Login` to store credentials +* use `Login` to store credentials. ### Deprecated +* Use (non-v5) `MqttOptions.network_options` instead of `EventLoop.network_options`. +* Use (non-v5) `MqttOptions.set_network_options` instead of `EventLoop.set_network_options`. + ### Removed +* move `NetworkOptions.conn_timeout` to (non-v5) `MqttOptions.conn_timeout`. + ### Fixed * Validate filters while creating subscription requests. diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9b1ce8c5..2b32a84e8 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -84,7 +84,6 @@ pub struct EventLoop { pub network: Option, /// Keep alive time keepalive_timeout: Option>>, - pub network_options: NetworkOptions, } /// Events which can be yielded by the event loop @@ -113,7 +112,6 @@ impl EventLoop { pending, network: None, keepalive_timeout: None, - network_options: NetworkOptions::new(), } } @@ -141,8 +139,8 @@ impl EventLoop { pub async fn poll(&mut self) -> Result { if self.network.is_none() { let (network, connack) = match time::timeout( - Duration::from_secs(self.network_options.connection_timeout()), - connect(&self.mqtt_options, self.network_options.clone()), + Duration::from_secs(self.mqtt_options.connection_timeout()), + connect(&self.mqtt_options), ) .await { @@ -173,7 +171,7 @@ impl EventLoop { // let await_acks = self.state.await_acks; let inflight_full = self.state.inflight >= self.mqtt_options.inflight; let collision = self.state.collision.is_some(); - let network_timeout = Duration::from_secs(self.network_options.connection_timeout()); + let network_timeout = Duration::from_secs(self.mqtt_options.connection_timeout()); // Read buffered events from previous polls before calling a new poll if let Some(event) = self.state.events.pop_front() { @@ -258,12 +256,17 @@ impl EventLoop { } } + #[deprecated(since = "0.25.0", note = "Use `MqttOptions.network_options` instead.")] pub fn network_options(&self) -> NetworkOptions { - self.network_options.clone() + self.mqtt_options.network_options() } + #[deprecated( + since = "0.25.0", + note = "Use `MqttOptions.set_network_options` instead." + )] pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self { - self.network_options = network_options; + self.mqtt_options.set_network_options(network_options); self } @@ -291,12 +294,9 @@ impl EventLoop { /// the stream. /// This function (for convenience) includes internal delays for users to perform internal sleeps /// between re-connections so that cancel semantics can be used during this sleep -async fn connect( - mqtt_options: &MqttOptions, - network_options: NetworkOptions, -) -> Result<(Network, Incoming), ConnectionError> { +async fn connect(mqtt_options: &MqttOptions) -> Result<(Network, Incoming), ConnectionError> { // connect to the broker - let mut network = network_connect(mqtt_options, network_options).await?; + let mut network = network_connect(mqtt_options).await?; // make MQTT connection request (which internally awaits for ack) let packet = mqtt_connect(mqtt_options, &mut network).await?; @@ -350,10 +350,7 @@ pub(crate) async fn socket_connect( })) } -async fn network_connect( - options: &MqttOptions, - network_options: NetworkOptions, -) -> Result { +async fn network_connect(options: &MqttOptions) -> Result { // Process Unix files early, as proxy is not supported for them. #[cfg(unix)] if matches!(options.transport(), Transport::Unix) { @@ -389,7 +386,7 @@ async fn network_connect( #[cfg(not(feature = "proxy"))] { let addr = format!("{domain}:{port}"); - let tcp = socket_connect(addr, network_options).await?; + let tcp = socket_connect(addr, options.network_options()).await?; Box::new(tcp) } }; diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 29cad1a34..6c6ad0385 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -369,7 +369,6 @@ impl From for TlsConfiguration { pub struct NetworkOptions { tcp_send_buffer_size: Option, tcp_recv_buffer_size: Option, - conn_timeout: u64, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: Option, } @@ -379,7 +378,6 @@ impl NetworkOptions { NetworkOptions { tcp_send_buffer_size: None, tcp_recv_buffer_size: None, - conn_timeout: 5, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: None, } @@ -393,17 +391,6 @@ impl NetworkOptions { self.tcp_recv_buffer_size = Some(size); } - /// set connection timeout in secs - pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self { - self.conn_timeout = timeout; - self - } - - /// get timeout in secs - pub fn connection_timeout(&self) -> u64 { - self.conn_timeout - } - /// bind connection to a specific network device by name #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] #[cfg_attr( @@ -449,11 +436,15 @@ pub struct MqttOptions { pending_throttle: Duration, /// maximum number of outgoing inflight messages inflight: u16, + /// Connection timeout + conn_timeout: u64, /// Last will that will be issued on unexpected disconnect last_will: Option, /// If set to `true` MQTT acknowledgements are not sent automatically. /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method. manual_acks: bool, + /// Provides a way to configure low level network connection configurations. + network_options: NetworkOptions, #[cfg(feature = "proxy")] /// Proxy configuration. proxy: Option, @@ -487,7 +478,9 @@ impl MqttOptions { pending_throttle: Duration::from_micros(0), inflight: 100, last_will: None, + conn_timeout: 5, manual_acks: false, + network_options: NetworkOptions::new(), #[cfg(feature = "proxy")] proxy: None, #[cfg(feature = "websocket")] @@ -677,6 +670,28 @@ impl MqttOptions { self.manual_acks } + /// set connection timeout in secs + pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self { + self.conn_timeout = timeout; + self + } + + /// get timeout in secs + pub fn connection_timeout(&self) -> u64 { + self.conn_timeout + } + + /// get network options + pub fn network_options(&self) -> NetworkOptions { + self.network_options.clone() + } + + /// set network options + pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self { + self.network_options = network_options; + self + } + #[cfg(feature = "proxy")] pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self { self.proxy = Some(proxy); @@ -891,6 +906,7 @@ impl Debug for MqttOptions { .field("pending_throttle", &self.pending_throttle) .field("inflight", &self.inflight) .field("last_will", &self.last_will) + .field("conn_timeout", &self.conn_timeout) .field("manual_acks", &self.manual_acks) .finish() } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 44499cde2..7188e0fdd 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -95,6 +95,7 @@ pub struct MqttOptions { /// If set to `true` MQTT acknowledgements are not sent automatically. /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method. manual_acks: bool, + /// Provides a way to configure low level network connection configurations. network_options: NetworkOptions, #[cfg(feature = "proxy")] /// Proxy configuration. @@ -494,10 +495,12 @@ impl MqttOptions { self.manual_acks } + /// get network options pub fn network_options(&self) -> NetworkOptions { self.network_options.clone() } + /// set network options pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self { self.network_options = network_options; self diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 854aa7b0f..5d415dc3f 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -65,7 +65,7 @@ pub enum StateError { #[error("Connection failed with reason '{reason:?}' ")] ConnFail { reason: ConnectReturnCode }, #[error("Connection closed by peer abruptly")] - ConnectionAborted + ConnectionAborted, } impl From for StateError { diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 0a83d57ce..e52a9593f 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -550,9 +550,9 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly options.set_keep_alive(Duration::from_secs(5)); let mut network_options = NetworkOptions::new(); network_options.set_tcp_send_buffer_size(1024); + options.set_network_options(network_options); let (client, mut eventloop) = AsyncClient::new(options, 5); - eventloop.set_network_options(network_options); task::spawn(async move { start_requests_with_payload(100, QoS::AtLeastOnce, 0, client, 5000).await; time::sleep(Duration::from_secs(10)).await;