Skip to content

Commit 59aa9b4

Browse files
committed
#505: Example of retrieving MQTT v5 properties in message received callback
1 parent 9a93f49 commit 59aa9b4

File tree

6 files changed

+250
-10
lines changed

6 files changed

+250
-10
lines changed

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(EXECUTABLES
3131
async_publish
3232
async_publish_time
3333
async_subscribe
34+
async_subscribe_v5
3435
async_consume
3536
async_consume_v5
3637
async_message_consume

examples/async_consume_v5.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ int main(int argc, char* argv[])
118118
cout << msg->get_topic() << ": " << msg->to_string();
119119

120120
const auto& props = msg->get_properties();
121-
size_t n = props.size();
122-
if (n != 0) {
121+
if (size_t n = props.size(); n != 0) {
123122
cout << "\n [";
124123
for (size_t i = 0; i < n - 1; ++i) cout << props[i] << ", ";
125124
cout << props[n - 1] << "]";

examples/async_subscribe.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// interface, employing callbacks to receive messages and status updates.
77
//
88
// The sample demonstrates:
9-
// - Connecting to an MQTT server/broker.
9+
// - Connecting to an MQTT server/broker using MQTT v3.
1010
// - Subscribing to a topic
1111
// - Receiving messages through the callback API
1212
// - Receiving network disconnect updates and attempting manual reconnects.
@@ -15,7 +15,7 @@
1515
//
1616

1717
/*******************************************************************************
18-
* Copyright (c) 2013-2024 Frank Pagliughi <fpagliughi@mindspring.com>
18+
* Copyright (c) 2013-2025 Frank Pagliughi <fpagliughi@mindspring.com>
1919
*
2020
* All rights reserved. This program and the accompanying materials
2121
* are made available under the terms of the Eclipse Public License v2.0
@@ -42,7 +42,8 @@
4242

4343
const std::string DFLT_SERVER_URI("mqtt://localhost:1883");
4444
const std::string CLIENT_ID("paho_cpp_async_subscribe");
45-
const std::string TOPIC("hello");
45+
46+
const std::string TOPIC("#");
4647

4748
const int QOS = 1;
4849
const int N_RETRY_ATTEMPTS = 5;

examples/async_subscribe_v5.cpp

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// async_subscribe.cpp
2+
//
3+
// This is a Paho MQTT C++ client, sample application.
4+
//
5+
// This application is an MQTT subscriber using the C++ asynchronous client
6+
// interface, employing callbacks to receive messages and status updates.
7+
//
8+
// The sample demonstrates:
9+
// - Connecting to an MQTT server/broker using MQTT v5.
10+
// - Subscribing to a topic
11+
// - Receiving messages through the callback API
12+
// - Displaying MQTT v5 message properties.
13+
// - Receiving network disconnect updates and attempting manual reconnects.
14+
// - Using a "clean session" and manually re-subscribing to topics on
15+
// reconnect.
16+
//
17+
18+
/*******************************************************************************
19+
* Copyright (c) 2013-2025 Frank Pagliughi <fpagliughi@mindspring.com>
20+
*
21+
* All rights reserved. This program and the accompanying materials
22+
* are made available under the terms of the Eclipse Public License v2.0
23+
* and Eclipse Distribution License v1.0 which accompany this distribution.
24+
*
25+
* The Eclipse Public License is available at
26+
* http://www.eclipse.org/legal/epl-v20.html
27+
* and the Eclipse Distribution License is available at
28+
* http://www.eclipse.org/org/documents/edl-v10.php.
29+
*
30+
* Contributors:
31+
* Frank Pagliughi - initial implementation and documentation
32+
*******************************************************************************/
33+
34+
#include <cctype>
35+
#include <chrono>
36+
#include <cstdlib>
37+
#include <cstring>
38+
#include <iostream>
39+
#include <string>
40+
#include <thread>
41+
42+
#include "mqtt/async_client.h"
43+
44+
const std::string DFLT_SERVER_URI("mqtt://localhost:1883");
45+
const std::string CLIENT_ID("paho_cpp_async_subscribe");
46+
const std::string TOPIC("#");
47+
48+
const int QOS = 1;
49+
const int N_RETRY_ATTEMPTS = 5;
50+
51+
/////////////////////////////////////////////////////////////////////////////
52+
53+
// Callbacks for the success or failures of requested actions.
54+
// This could be used to initiate further action, but here we just log the
55+
// results to the console.
56+
57+
class action_listener : public virtual mqtt::iaction_listener
58+
{
59+
std::string name_;
60+
61+
void on_failure(const mqtt::token& tok) override
62+
{
63+
std::cout << name_ << " failure";
64+
if (tok.get_message_id() != 0)
65+
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
66+
std::cout << std::endl;
67+
}
68+
69+
void on_success(const mqtt::token& tok) override
70+
{
71+
std::cout << name_ << " success";
72+
if (tok.get_message_id() != 0)
73+
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
74+
auto top = tok.get_topics();
75+
if (top && !top->empty())
76+
std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
77+
std::cout << std::endl;
78+
}
79+
80+
public:
81+
action_listener(const std::string& name) : name_(name) {}
82+
};
83+
84+
/////////////////////////////////////////////////////////////////////////////
85+
86+
/**
87+
* Local callback & listener class for use with the client connection.
88+
* This is primarily intended to receive messages, but it will also monitor
89+
* the connection to the broker. If the connection is lost, it will attempt
90+
* to restore the connection and re-subscribe to the topic.
91+
*/
92+
class callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener
93+
94+
{
95+
// Counter for the number of connection retries
96+
int nretry_;
97+
// The MQTT client
98+
mqtt::async_client& cli_;
99+
// Options to use if we need to reconnect
100+
mqtt::connect_options& connOpts_;
101+
// An action listener to display the result of actions.
102+
action_listener subListener_;
103+
104+
// This deomonstrates manually reconnecting to the broker by calling
105+
// connect() again. This is a possibility for an application that keeps
106+
// a copy of it's original connect_options, or if the app wants to
107+
// reconnect with different options.
108+
// Another way this can be done manually, if using the same options, is
109+
// to just call the async_client::reconnect() method.
110+
void reconnect()
111+
{
112+
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
113+
try {
114+
cli_.connect(connOpts_, nullptr, *this);
115+
}
116+
catch (const mqtt::exception& exc) {
117+
std::cerr << "Error: " << exc.what() << std::endl;
118+
exit(1);
119+
}
120+
}
121+
122+
// Re-connection failure
123+
void on_failure(const mqtt::token& tok) override
124+
{
125+
std::cout << "Connection attempt failed" << std::endl;
126+
if (++nretry_ > N_RETRY_ATTEMPTS)
127+
exit(1);
128+
reconnect();
129+
}
130+
131+
// (Re)connection success
132+
// Either this or connected() can be used for callbacks.
133+
void on_success(const mqtt::token& tok) override {}
134+
135+
// (Re)connection success
136+
void connected(const std::string& cause) override
137+
{
138+
std::cout << "\nConnection success" << std::endl;
139+
std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
140+
<< "\tfor client " << CLIENT_ID << " using QoS" << QOS << "\n"
141+
<< "\nPress Q<Enter> to quit\n"
142+
<< std::endl;
143+
144+
cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
145+
}
146+
147+
// Callback for when the connection is lost.
148+
// This will initiate the attempt to manually reconnect.
149+
void connection_lost(const std::string& cause) override
150+
{
151+
std::cout << "\nConnection lost" << std::endl;
152+
if (!cause.empty())
153+
std::cout << "\tcause: " << cause << std::endl;
154+
155+
std::cout << "Reconnecting..." << std::endl;
156+
nretry_ = 0;
157+
reconnect();
158+
}
159+
160+
// Callback for when a message arrives.
161+
void message_arrived(mqtt::const_message_ptr msg) override
162+
{
163+
std::cout << "\nMessage arrived" << std::endl;
164+
std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
165+
std::cout << "\tpayload: '" << msg->to_string() << std::endl;
166+
167+
const mqtt::properties& props = msg->get_properties();
168+
if (size_t n = props.size(); n != 0) {
169+
std::cout << "\tproperties (" << n << "):\n\t [";
170+
for (size_t i = 0; i < n - 1; ++i) std::cout << props[i] << ", ";
171+
std::cout << props[n - 1] << "]" << std::endl;
172+
}
173+
}
174+
175+
void delivery_complete(mqtt::delivery_token_ptr token) override {}
176+
177+
public:
178+
callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
179+
: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription")
180+
{
181+
}
182+
};
183+
184+
/////////////////////////////////////////////////////////////////////////////
185+
186+
int main(int argc, char* argv[])
187+
{
188+
// A subscriber often wants the server to remember its messages when its
189+
// disconnected. In that case, it needs a unique ClientID and a
190+
// non-clean session.
191+
192+
auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI;
193+
194+
mqtt::async_client cli(serverURI, CLIENT_ID);
195+
196+
auto connOpts = mqtt::connect_options_builder::v5()
197+
.clean_start(true)
198+
.finalize();
199+
200+
// Install the callback(s) before connecting.
201+
callback cb(cli, connOpts);
202+
cli.set_callback(cb);
203+
204+
// Start the connection.
205+
// When completed, the callback will subscribe to topic.
206+
207+
try {
208+
std::cout << "Connecting to the MQTT server '" << serverURI << "'..." << std::flush;
209+
cli.connect(connOpts, nullptr, cb);
210+
}
211+
catch (const mqtt::exception& exc) {
212+
std::cerr << "\nERROR: Unable to connect to MQTT server: '" << serverURI << "'" << exc
213+
<< std::endl;
214+
return 1;
215+
}
216+
217+
// Just block till user tells us to quit.
218+
219+
while (std::tolower(std::cin.get()) != 'q');
220+
221+
// Disconnect
222+
223+
try {
224+
std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
225+
cli.disconnect()->wait();
226+
std::cout << "OK" << std::endl;
227+
}
228+
catch (const mqtt::exception& exc) {
229+
std::cerr << exc << std::endl;
230+
return 1;
231+
}
232+
233+
return 0;
234+
}

src/client.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ client::client(
6161
}
6262

6363
client::client(const create_options& opts)
64-
: cli_(opts),
65-
timeout_(DFLT_TIMEOUT),
66-
userCallback_(nullptr)
64+
: cli_(opts), timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
6765
{
6866
}
6967

src/properties.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,15 @@ std::ostream& operator<<(std::ostream& os, const property& prop)
215215

216216
case MQTTPROPERTY_TYPE_BINARY_DATA: {
217217
auto bin = get<binary>(prop);
218-
for (const char& by : bin) os << std::hex << unsigned(by);
219-
os << std::dec;
218+
auto n = bin.size();
219+
os << '[';
220+
if (n > 0) {
221+
os << std::hex;
222+
for (size_t i = 0; i < n - 1; ++i) os << "0x" << unsigned(bin[i]) << ", ";
223+
os << "0x" << unsigned(bin[n - 1]);
224+
os << std::dec;
225+
}
226+
os << ']';
220227
} break;
221228

222229
case MQTTPROPERTY_TYPE_UTF_8_ENCODED_STRING:

0 commit comments

Comments
 (0)