Skip to content

refactor: keep v5 and non-v5 client api consistent as possible #861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection
* `connection_timeout()` and `set_connection_timeout()` method on (non-v5) `MqttOptions`.
* `network_options()` method on (non-v5) `MqttOptions` on to instead `EventLoop.network_options()`.
* `set_network_options()` method on (non-v5) `MqttOptions` on to instead `EventLoop.set_network_options()`.

### Changed

* rename `N` as `AsyncReadWrite` to describe usage.
* use `Framed` to encode/decode MQTT packets.
* use `Login` to store credentials
* use `Login` to store credentials.

### Deprecated

* Use (non-v5) `MqttOptions.network_options` instead of `EventLoop.network_options`.
* Use (non-v5) `MqttOptions.set_network_options` instead of `EventLoop.set_network_options`.

### Removed

* move `NetworkOptions.conn_timeout` to (non-v5) `MqttOptions.conn_timeout`.

### Fixed

* Validate filters while creating subscription requests.
Expand Down
31 changes: 14 additions & 17 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ pub struct EventLoop {
pub network: Option<Network>,
/// Keep alive time
keepalive_timeout: Option<Pin<Box<Sleep>>>,
pub network_options: NetworkOptions,
}

/// Events which can be yielded by the event loop
Expand Down Expand Up @@ -113,7 +112,6 @@ impl EventLoop {
pending,
network: None,
keepalive_timeout: None,
network_options: NetworkOptions::new(),
}
}

Expand Down Expand Up @@ -141,8 +139,8 @@ impl EventLoop {
pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
if self.network.is_none() {
let (network, connack) = match time::timeout(
Duration::from_secs(self.network_options.connection_timeout()),
connect(&self.mqtt_options, self.network_options.clone()),
Duration::from_secs(self.mqtt_options.connection_timeout()),
connect(&self.mqtt_options),
)
.await
{
Expand Down Expand Up @@ -173,7 +171,7 @@ impl EventLoop {
// let await_acks = self.state.await_acks;
let inflight_full = self.state.inflight >= self.mqtt_options.inflight;
let collision = self.state.collision.is_some();
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
let network_timeout = Duration::from_secs(self.mqtt_options.connection_timeout());

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand Down Expand Up @@ -258,12 +256,17 @@ impl EventLoop {
}
}

#[deprecated(since = "0.25.0", note = "Use `MqttOptions.network_options` instead.")]
pub fn network_options(&self) -> NetworkOptions {
self.network_options.clone()
self.mqtt_options.network_options()
}

#[deprecated(
since = "0.25.0",
note = "Use `MqttOptions.set_network_options` instead."
)]
pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
self.network_options = network_options;
self.mqtt_options.set_network_options(network_options);
self
}

Expand Down Expand Up @@ -291,12 +294,9 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(mqtt_options: &MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;
let mut network = network_connect(mqtt_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
Expand Down Expand Up @@ -350,10 +350,7 @@ pub(crate) async fn socket_connect(
}))
}

async fn network_connect(
options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<Network, ConnectionError> {
async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionError> {
// Process Unix files early, as proxy is not supported for them.
#[cfg(unix)]
if matches!(options.transport(), Transport::Unix) {
Expand Down Expand Up @@ -389,7 +386,7 @@ async fn network_connect(
#[cfg(not(feature = "proxy"))]
{
let addr = format!("{domain}:{port}");
let tcp = socket_connect(addr, network_options).await?;
let tcp = socket_connect(addr, options.network_options()).await?;
Box::new(tcp)
}
};
Expand Down
42 changes: 29 additions & 13 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ impl From<ClientConfig> for TlsConfiguration {
pub struct NetworkOptions {
tcp_send_buffer_size: Option<u32>,
tcp_recv_buffer_size: Option<u32>,
conn_timeout: u64,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: Option<String>,
}
Expand All @@ -379,7 +378,6 @@ impl NetworkOptions {
NetworkOptions {
tcp_send_buffer_size: None,
tcp_recv_buffer_size: None,
conn_timeout: 5,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: None,
}
Expand All @@ -393,17 +391,6 @@ impl NetworkOptions {
self.tcp_recv_buffer_size = Some(size);
}

/// set connection timeout in secs
pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
self.conn_timeout = timeout;
self
}

/// get timeout in secs
pub fn connection_timeout(&self) -> u64 {
self.conn_timeout
}

/// bind connection to a specific network device by name
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
#[cfg_attr(
Expand Down Expand Up @@ -449,11 +436,15 @@ pub struct MqttOptions {
pending_throttle: Duration,
/// maximum number of outgoing inflight messages
inflight: u16,
/// Connection timeout
conn_timeout: u64,
/// Last will that will be issued on unexpected disconnect
last_will: Option<LastWill>,
/// If set to `true` MQTT acknowledgements are not sent automatically.
/// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
manual_acks: bool,
/// Provides a way to configure low level network connection configurations.
network_options: NetworkOptions,
#[cfg(feature = "proxy")]
/// Proxy configuration.
proxy: Option<Proxy>,
Expand Down Expand Up @@ -487,7 +478,9 @@ impl MqttOptions {
pending_throttle: Duration::from_micros(0),
inflight: 100,
last_will: None,
conn_timeout: 5,
manual_acks: false,
network_options: NetworkOptions::new(),
#[cfg(feature = "proxy")]
proxy: None,
#[cfg(feature = "websocket")]
Expand Down Expand Up @@ -677,6 +670,28 @@ impl MqttOptions {
self.manual_acks
}

/// set connection timeout in secs
pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
self.conn_timeout = timeout;
self
}

/// get timeout in secs
pub fn connection_timeout(&self) -> u64 {
self.conn_timeout
}

/// get network options
pub fn network_options(&self) -> NetworkOptions {
self.network_options.clone()
}

/// set network options
pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
self.network_options = network_options;
self
}

#[cfg(feature = "proxy")]
pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
self.proxy = Some(proxy);
Expand Down Expand Up @@ -891,6 +906,7 @@ impl Debug for MqttOptions {
.field("pending_throttle", &self.pending_throttle)
.field("inflight", &self.inflight)
.field("last_will", &self.last_will)
.field("conn_timeout", &self.conn_timeout)
.field("manual_acks", &self.manual_acks)
.finish()
}
Expand Down
3 changes: 3 additions & 0 deletions rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub struct MqttOptions {
/// If set to `true` MQTT acknowledgements are not sent automatically.
/// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
manual_acks: bool,
/// Provides a way to configure low level network connection configurations.
network_options: NetworkOptions,
#[cfg(feature = "proxy")]
/// Proxy configuration.
Expand Down Expand Up @@ -494,10 +495,12 @@ impl MqttOptions {
self.manual_acks
}

/// get network options
pub fn network_options(&self) -> NetworkOptions {
self.network_options.clone()
}

/// set network options
pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
self.network_options = network_options;
self
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub enum StateError {
#[error("Connection failed with reason '{reason:?}' ")]
ConnFail { reason: ConnectReturnCode },
#[error("Connection closed by peer abruptly")]
ConnectionAborted
ConnectionAborted,
}

impl From<mqttbytes::Error> for StateError {
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,9 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly
options.set_keep_alive(Duration::from_secs(5));
let mut network_options = NetworkOptions::new();
network_options.set_tcp_send_buffer_size(1024);
options.set_network_options(network_options);

let (client, mut eventloop) = AsyncClient::new(options, 5);
eventloop.set_network_options(network_options);
task::spawn(async move {
start_requests_with_payload(100, QoS::AtLeastOnce, 0, client, 5000).await;
time::sleep(Duration::from_secs(10)).await;
Expand Down