23 #include <nlohmann/json.hpp>
34 public virtual mqtt::iaction_listener
39 std::shared_ptr<mqtt::async_client>
cli_;
46 INFO(
"Will try to reconnect to MQTT broker in 10 seconds...");
47 std::this_thread::sleep_for(std::chrono::milliseconds(10000));
50 cli_->connect(connOpts_,
nullptr, *
this);
52 catch (
const mqtt::exception& e)
60 ERROR(
"MQTT broker connection attempt failed.");
67 ERROR(
"Maximum retry attempts reached. We will not try to reconnect to the MQTT broker anymore.");
77 INFO(
"Connected successfully to the MQTT broker.");
79 for (
const auto& topic : server_.
topics())
81 cli_->subscribe(server_.
config()->subscribe_prefix() + topic->subject(),
QOS);
87 WARN(
"Connection lost to the MQTT broker.");
93 WARN(
"Reconnecting to MQTT broker...");
100 INFO(
"MQTT message arrived:"
101 <<
"\n\t Topic: " << msg->get_topic()
102 <<
"\n\t Payload: " << msg->to_string());
113 : nretry_(0), cli_(cli), connOpts_(connOpts), server_(server)
119 std::shared_ptr<const MqttServerConfig> config,
120 const std::vector<std::shared_ptr<const MqttExternalMessage>> &topics)
121 : sock_(ctx,
zmqpp::socket_type::rep)
122 , bus_push_(ctx,
zmqpp::socket_type::push)
126 bus_push_.connect(
"inproc://zmq-bus-pull");
130 for (
const auto &topic :
topics)
134 auto topic_sock = zmqpp::socket(ctx, zmqpp::socket_type::rep);
135 topic_sock.bind(
"inproc://" + topic->name());
138 std::pair<std::string, zmqpp::socket>(
140 std::move(topic_sock)
153 reactor->add(t.second,
160 client_ = std::make_shared<mqtt::async_client>(std::string(
config_->ssl() ?
"ssl" :
"tcp") +
163 auto connOpts = mqtt::connect_options_builder()
166 if (!
config_->username().empty())
168 connOpts.set_user_name(
config_->username());
169 connOpts.set_password(
config_->password());
173 connOpts.set_ssl(mqtt::ssl_options(
config_->ssl_ca_certs(),
174 config_->ssl_client_certfile(),
184 client_->connect(connOpts,
nullptr, cb);
196 topic_sock.receive(msg);
197 std::string topic_value;
200 mqtt::message_ptr pubmsg;
201 std::shared_ptr<const MqttExternalMessage> extmsg;
202 for (
const auto topic :
topics_)
204 if (topic->name() == topic_name)
209 ASSERT_LOG(extmsg !=
nullptr,
"Unknown topic.");
211 bool interupt =
false;
216 std::string payload = extmsg->
payload();
223 payload = boost::replace_all_copy(payload,
"__PLACEHOLDER__", state);
225 INFO(
"Payload to be published as MQTT message:" << payload);
226 pubmsg = mqtt::make_message(
config_->publish_prefix() + extmsg->
name(), payload);
230 ASSERT_LOG(pubmsg !=
nullptr,
"Unsupported topic virtual type for MQTT module.");
231 pubmsg->set_qos(
QOS);
237 topic_sock.send(
"OK");
242 zmsg << (
"S_INT:" + extmsg->
name()) << topic_value;
246 catch(
const mqtt::exception& e)
248 topic_sock.send(
"KO");
255 std::shared_ptr<const MqttExternalMessage> extmsg;
256 for (
const auto topic :
topics_)
258 if (
config_->subscribe_prefix() + topic->subject() == msg->get_topic())
263 ASSERT_LOG(extmsg !=
nullptr,
"Unknown topic.");
265 const std::string payload = extmsg->
payload();
266 std::string msg_value = msg->to_string();
267 if (!payload.empty())
269 auto msg_json = nlohmann::json::parse(msg->to_string());
270 auto d = msg_json.at(payload);
273 msg_value = d.get<std::string>();
277 ERROR(
"Cannot found json key `" << payload <<
"` on received MQTT message.");
280 INFO(
"Extracted value to be forwarded to ZMQ: " << msg_value);
284 if (msg_value ==
"1")
288 else if (msg_value ==
"0")
294 zmsg << (
"S_INT:" + extmsg->
name()) << msg_value;
297 else if (extmsg->
virtualtype() == DeviceClass::RFID_READER)
313 assert(str ==
"CONNECT" || str ==
"DISCONNECT");
314 if (str ==
"CONNECT")
322 catch(
const mqtt::exception& e)
328 else if (str ==
"DISCONNECT")
336 catch(
const mqtt::exception& e)