Open
Description
I want to manually listen to the linux signal in rust to handle the disconnection。
but has bug.
My guess is that there is a processing signal at the c, but this signal is intercepted by my upper layer, causing the c layer not to process it?
full code
use std::{env, process, thread, time::Duration};
use tokio::signal::unix::{signal, SignalKind};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER: &str = "tcp://localhost:1883";
const DFLT_CLIENT: &str = "rust_subscribe";
const DFLT_TOPICS: &[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS: &[i32] = &[0, 1];
// Reconnect to the broker when connection is lost.
async fn try_reconnect(cli: &mqtt::AsyncClient) -> bool {
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().await.is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Subscribes to multiple topics.
async fn subscribe_topics(cli: &mqtt::AsyncClient) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS).await {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
#[tokio::main]
async fn main() {
let host = env::args()
.nth(1)
.unwrap_or_else(|| DFLT_BROKER.to_string());
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.persistence(None)
.finalize();
// Create a client.
let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
let cloned_cli = cli.clone();
tokio::spawn(async move {
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
let lwt_props = mqtt::properties!(mqtt::PropertyCode::WillDelayInterval => 10);
// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("lost_connection_topic")
.payload("{}")
.properties(lwt_props)
.qos(1)
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts).await {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Subscribe topics.
subscribe_topics(&cli).await;
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
} else if !cli.is_connected() {
if try_reconnect(&cli).await {
println!("Resubscribe topics...");
subscribe_topics(&cli).await;
} else {
break;
}
}
}
});
let mut stream = signal(SignalKind::terminate()).unwrap();
stream.recv().await;
println!("gruceful shutdown")
}
start it
cargo run
stop it
ctrl + c
At this point the process will get stuck。
my dependencies
[dependencies]
paho-mqtt = "0.11.1"
emqx server version: 5.0.13
Metadata
Metadata
Assignees
Labels
No labels