Skip to content

mqtt v5 subscriber disconnect problem #195

Open
@xiaozefeng

Description

@xiaozefeng

I use paho-mqtt v5 to subscribe to a topic and after a period of almost 12 hours it disconnects

rust version :

rustup 1.25.2 (17db695f1 2023-02-01)
info: This is the version for the rustup toolchain manager, not the rustc compiler.
info: The currently active `rustc` version is `rustc 1.67.1 (d5a82bbd2 2023-02-07)`

the os:
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 20.04.5 LTS
Release: 20.04
Codename: focal

this is the Cargo.toml

tokio= {version = "1", features = ["full"]}
tracing="0.1.3"
tracing-subscriber = "0.3.16"
paho-mqtt = "0.12.0"
tokio-stream = "0.1.12"

this is the code :

use std::time::Duration;

use mqtt::MQTT_VERSION_5;
use paho_mqtt as mqtt;
use tokio_stream::StreamExt;
use tracing::info;
#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let host = "mqtt://localhost:1883";
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id("paho-mqtt-client-v5")
        .finalize();
    let mut cli = mqtt::AsyncClient::new(create_opts).unwrap();
    let mut strm = cli.get_stream(25);

    let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5)
        .keep_alive_interval(Duration::from_secs(30))
        .clean_start(false)
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        .user_name("username")
        .password("password")
        .finalize();

    info!("connecting to mqtt server...");
    cli.connect(conn_opts).await.unwrap();

    let topic = "mining_updater_topic_test";
    info!("subscribing topic: {}", topic);

    cli.subscribe(topic, 1).await.unwrap();

    info!("waiting for receive message...");

    while let Some(msg_opt) = strm.next().await {
        if let Some(msg) = msg_opt {
            println!("{msg}");
        } else {
            info!("lost connection to mqtt server");
            while let Err(e) = cli.reconnect().await {
                println!("Error reconnect: {}", e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions