I try to implement an asynchronous MQTT client with the paho library, that receives messages on topic "request", formulates a string and puts the response out on topic "response". I use the callbacks to handle the incoming messages.
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};
class TestCallback : public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
// (re)connection success
void connected(const std::string& cause) override
{
cli_.subscribe("request", 0);
}
// callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override
{
if( msg->get_topic() == "request" )
{
/* format response message here and put it into (string) msg */
mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
pubmsg->set_qos(2);
//// PROBLEMATIC CODE ////
cli_.publish(pubmsg)->wait();
//////////////////////////
}
}
public:
TestCallback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char** argv)
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
TestCallback cb(cli);
cli.set_callback(cb);
mqtt::connect_options connOpts = mqtt::connect_options_builder()
.clean_session(false)
.automatic_reconnect()
.finalize();
try
{
cli.connect(connOpts)->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
// run until the application is shut down
while (std::tolower(std::cin.get()) != 'q')
;
try
{
cli.disconnect()->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
return 0;
}
The problem arises when I try to publish the response message, as the client seems to wait indefinitely. Responsible for this is the wait
function which is used on a token to track the status of the published message (reference). To my understanding, this has to be done especially when using higher levels of QoS so ensure everything went well.
Upon removal of the call to wait()
, it works as expected. But I am not sure if this ensures the correct publishing of messages.
What is the correct way to do this?
CodePudding user response:
I'm going to make a guess here, because I don't really know how async works in C .
The MQTT client has a single message handling thread, this deals with all the incoming and outgoing TCP packets as they arrive/depart on the socket. When a new MQTT message arrives it then calls the message handler callback (message_arrived
), in which you call publish
and wait
for it to complete. But because the call to wait
effectively blocks message_arrived
the message handling thread can not continue. This means it can not deal with the 3 legged QOS2 handshake required for the publish
to complete, hence it hangs.
I will also guess that if you changed the publish to QOS 0 it would complete, but would also fail with QOS 1 as that requires the message handling thread to send/receive multiple messages to continue.
Not waiting for the publish to complete is probably the correct solution.