Leosac  0.8.0
Open Source Access Control
MqttExternalServer.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2014-2022 Leosac
3 
4  This file is part of Leosac.
5 
6  Leosac is free software: you can redistribute it and/or modify
7  it under the terms of the GNU Affero General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  Leosac is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU Affero General Public License for more details.
15 
16  You should have received a copy of the GNU Affero General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19 
21 #include "tools/log.hpp"
22 #include <core/auth/Auth.hpp>
23 #include <nlohmann/json.hpp>
24 #include <iomanip>
25 
26 using namespace Leosac::Module::Mqtt;
27 using namespace Leosac::Hardware;
28 using namespace Leosac::Auth;
29 
30 const int QOS = 1;
31 const int N_RETRY_ATTEMPTS = 60;
32 
33 class mqtt_callback : public virtual mqtt::callback,
34  public virtual mqtt::iaction_listener
35 {
36  // Counter for the number of connection retries
37  int nretry_;
38  // The MQTT client
39  std::shared_ptr<mqtt::async_client> cli_;
40  // Options to use if we need to reconnect
41  mqtt::connect_options& connOpts_;
43 
44  void reconnect()
45  {
46  INFO("Will try to reconnect to MQTT broker in 10 seconds...");
47  std::this_thread::sleep_for(std::chrono::milliseconds(10000));
48  try
49  {
50  cli_->connect(connOpts_, nullptr, *this);
51  }
52  catch (const mqtt::exception& e)
53  {
54  ERROR(e.what());
55  }
56  }
57 
58  void on_failure(const mqtt::token& tok) override
59  {
60  ERROR("MQTT broker connection attempt failed.");
61  if (++nretry_ <= N_RETRY_ATTEMPTS)
62  {
63  reconnect();
64  }
65  else
66  {
67  ERROR("Maximum retry attempts reached. We will not try to reconnect to the MQTT broker anymore.");
68  }
69  }
70 
71  void on_success(const mqtt::token& tok) override
72  {
73  }
74 
75  void connected(const std::string& cause) override
76  {
77  INFO("Connected successfully to the MQTT broker.");
78 
79  for (const auto& topic : server_.topics())
80  {
81  cli_->subscribe(server_.config()->subscribe_prefix() + topic->subject(), QOS);
82  }
83  }
84 
85  void connection_lost(const std::string& cause) override
86  {
87  WARN("Connection lost to the MQTT broker.");
88  if (!cause.empty())
89  {
90  INFO(cause);
91  }
92 
93  WARN("Reconnecting to MQTT broker...");
94  nretry_ = 0;
95  reconnect();
96  }
97 
98  void message_arrived(mqtt::const_message_ptr msg) override
99  {
100  INFO("MQTT message arrived:"
101  << "\n\t Topic: " << msg->get_topic()
102  << "\n\t Payload: " << msg->to_string());
103 
104  server_.handle_mqtt_msg(msg);
105  }
106 
107  void delivery_complete(mqtt::delivery_token_ptr tok) override
108  {
109  }
110 
111 public:
112  mqtt_callback(std::shared_ptr<mqtt::async_client> cli, mqtt::connect_options& connOpts, MqttExternalServer& server)
113  : nretry_(0), cli_(cli), connOpts_(connOpts), server_(server)
114  {
115  }
116 };
117 
119  std::shared_ptr<const MqttServerConfig> config,
120  const std::vector<std::shared_ptr<const MqttExternalMessage>> &topics)
121  : sock_(ctx, zmqpp::socket_type::rep)
122  , bus_push_(ctx, zmqpp::socket_type::push)
123  , config_(config)
124  , topics_(topics)
125 {
126  bus_push_.connect("inproc://zmq-bus-pull");
127 
128  sock_.bind("inproc://" + config->name());
129 
130  for (const auto &topic : topics)
131  {
132  if (topic->direction() == Hardware::ExternalMessage::Direction::Publish)
133  {
134  auto topic_sock = zmqpp::socket(ctx, zmqpp::socket_type::rep);
135  topic_sock.bind("inproc://" + topic->name());
136  topic_socks_.insert(
137  topic_socks_.end(),
138  std::pair<std::string, zmqpp::socket>(
139  topic->name(),
140  std::move(topic_sock)
141  )
142  );
143  }
144  }
145 }
146 
147 void MqttExternalServer::register_sockets(zmqpp::reactor *reactor)
148 {
149  reactor->add(sock_,
150  std::bind(&MqttExternalServer::handle_request, this));
151  for (auto const& t : topic_socks_)
152  {
153  reactor->add(t.second,
154  std::bind(&MqttExternalServer::handle_topic_request, this, t.first));
155  }
156 }
157 
159 {
160  client_ = std::make_shared<mqtt::async_client>(std::string(config_->ssl() ? "ssl" : "tcp") +
161  "://" + config_->host() + ":" + std::to_string(config_->port()),
162  config_->client_id());
163  auto connOpts = mqtt::connect_options_builder()
164  .clean_session()
165  .finalize();
166  if (!config_->username().empty())
167  {
168  connOpts.set_user_name(config_->username());
169  connOpts.set_password(config_->password());
170  }
171  if (config_->ssl())
172  {
173  connOpts.set_ssl(mqtt::ssl_options(config_->ssl_ca_certs(),
174  config_->ssl_client_certfile(),
175  config_->ssl_client_keyfile(),
176  "",
177  "ALL",
178  config_->ssl_insecure()));
179  }
180 
181  mqtt_callback cb(client_, connOpts, *this);
182  client_->set_callback(cb);
183 
184  client_->connect(connOpts, nullptr, cb);
185 }
186 
188 {
189  client_->disconnect();
190 }
191 
192 void MqttExternalServer::handle_topic_request(const std::string& topic_name)
193 {
194  zmqpp::message msg;
195  auto& topic_sock = topic_socks_.at(topic_name);
196  topic_sock.receive(msg);
197  std::string topic_value;
198  msg >> topic_value;
199 
200  mqtt::message_ptr pubmsg;
201  std::shared_ptr<const MqttExternalMessage> extmsg;
202  for (const auto topic : topics_)
203  {
204  if (topic->name() == topic_name)
205  {
206  extmsg = topic;
207  }
208  }
209  ASSERT_LOG(extmsg != nullptr, "Unknown topic.");
210 
211  bool interupt = false;
212  if (extmsg->virtualtype() == DeviceClass::GPIO || extmsg->virtualtype() == DeviceClass::LED || extmsg->virtualtype() == DeviceClass::BUZZER)
213  {
214  std::string state;
215  msg >> state; // should be ON or OFF
216  std::string payload = extmsg->payload();
217  if (payload.empty())
218  {
219  payload = state;
220  }
221  else
222  {
223  payload = boost::replace_all_copy(payload, "__PLACEHOLDER__", state);
224  }
225  INFO("Payload to be published as MQTT message:" << payload);
226  pubmsg = mqtt::make_message(config_->publish_prefix() + extmsg->name(), payload);
227  interupt = true;
228  }
229 
230  ASSERT_LOG(pubmsg != nullptr, "Unsupported topic virtual type for MQTT module.");
231  pubmsg->set_qos(QOS);
232 
233  try
234  {
235  client_->publish(pubmsg);
236 
237  topic_sock.send("OK");
238 
239  if (interupt)
240  {
241  zmqpp::message zmsg;
242  zmsg << ("S_INT:" + extmsg->name()) << topic_value; // Should be ON or OFF
243  bus_push_.send(zmsg);
244  }
245  }
246  catch(const mqtt::exception& e)
247  {
248  topic_sock.send("KO");
249  ERROR(e.what());
250  }
251 }
252 
253 void MqttExternalServer::handle_mqtt_msg(mqtt::const_message_ptr msg)
254 {
255  std::shared_ptr<const MqttExternalMessage> extmsg;
256  for (const auto topic : topics_)
257  {
258  if (config_->subscribe_prefix() + topic->subject() == msg->get_topic())
259  {
260  extmsg = topic;
261  }
262  }
263  ASSERT_LOG(extmsg != nullptr, "Unknown topic.");
264 
265  const std::string payload = extmsg->payload();
266  std::string msg_value = msg->to_string();
267  if (!payload.empty())
268  {
269  auto msg_json = nlohmann::json::parse(msg->to_string());
270  auto d = msg_json.at(payload);
271  if (!d.is_null())
272  {
273  msg_value = d.get<std::string>();
274  }
275  else
276  {
277  ERROR("Cannot found json key `" << payload << "` on received MQTT message.");
278  }
279  }
280  INFO("Extracted value to be forwarded to ZMQ: " << msg_value);
281 
282  if (extmsg->virtualtype() == DeviceClass::GPIO || extmsg->virtualtype() == DeviceClass::LED || extmsg->virtualtype() == DeviceClass::BUZZER)
283  {
284  if (msg_value == "1")
285  {
286  msg_value = "ON";
287  }
288  else if (msg_value == "0")
289  {
290  msg_value = "OFF";
291  }
292 
293  zmqpp::message zmsg;
294  zmsg << ("S_INT:" + extmsg->name()) << msg_value; // Should be ON or OFF
295  bus_push_.send(zmsg);
296  }
297  else if (extmsg->virtualtype() == DeviceClass::RFID_READER)
298  {
299  zmqpp::message zmsg;
300  zmsg << ("S_" + extmsg->name()) << Leosac::Auth::SourceType::SIMPLE_CSN
301  << msg_value;
302  bus_push_.send(zmsg);
303  }
304 }
305 
307 {
308  zmqpp::message msg;
309  std::string str;
310  sock_.receive(msg);
311 
312  msg >> str;
313  assert(str == "CONNECT" || str == "DISCONNECT");
314  if (str == "CONNECT")
315  {
316  msg.pop_front();
317  try
318  {
319  connect();
320  sock_.send("OK");
321  }
322  catch(const mqtt::exception& e)
323  {
324  ERROR(e.what());
325  sock_.send("KO");
326  }
327  }
328  else if (str == "DISCONNECT")
329  {
330  msg.pop_front();
331  try
332  {
333  disconnect();
334  sock_.send("OK");
335  }
336  catch(const mqtt::exception& e)
337  {
338  ERROR(e.what());
339  sock_.send("KO");
340  }
341  }
342 }
343 
344 const std::string& MqttExternalServer::name() const
345 {
346  return config_->name();
347 }
348 
349 const std::vector<std::shared_ptr<const MqttExternalMessage>>& MqttExternalServer::topics() const
350 {
351  return topics_;
352 }
353 
354 std::shared_ptr<const MqttServerConfig> MqttExternalServer::config() const
355 {
356  return config_;
357 }
mqtt_callback::server_
MqttExternalServer & server_
Definition: MqttExternalServer.cpp:42
Leosac::Auth
Holds classes relevant to the Authentication and Authorization subsystem.
Definition: AccessPoint.hpp:27
Leosac::Module::Mqtt::MqttExternalServer::topics_
std::vector< std::shared_ptr< const MqttExternalMessage > > topics_
Definition: MqttExternalServer.hpp:99
WARN
@ WARN
Definition: log.hpp:33
zmqpp
Definition: CoreUtils.hpp:27
mqtt_callback::cli_
std::shared_ptr< mqtt::async_client > cli_
Definition: MqttExternalServer.cpp:39
Leosac::Module::Mqtt::MqttExternalServer::handle_mqtt_msg
void handle_mqtt_msg(mqtt::const_message_ptr msg)
Message received from mqtt.
Definition: MqttExternalServer.cpp:253
ERROR
@ ERROR
Definition: log.hpp:32
mqtt_callback::message_arrived
void message_arrived(mqtt::const_message_ptr msg) override
Definition: MqttExternalServer.cpp:98
Auth.hpp
ASSERT_LOG
#define ASSERT_LOG(cond, msg)
Definition: log.hpp:190
Leosac::Module::Mqtt::MqttExternalServer::register_sockets
void register_sockets(zmqpp::reactor *reactor)
Definition: MqttExternalServer.cpp:147
Leosac::Module::Mqtt::MqttExternalServer::handle_request
void handle_request()
Someone sent a request.
Definition: MqttExternalServer.cpp:306
Leosac::Hardware::ExternalMessage::payload
std::string payload() const
Definition: ExternalMessage.cpp:65
Leosac::Module::Mqtt::MqttExternalServer::bus_push_
zmqpp::socket bus_push_
Socket to write to the message bus.
Definition: MqttExternalServer.hpp:85
Leosac::Hardware::ExternalMessage::Direction::Publish
@ Publish
INFO
@ INFO
Definition: log.hpp:34
Leosac::Module::Mqtt::MqttExternalServer::config_
std::shared_ptr< const MqttServerConfig > config_
Definition: MqttExternalServer.hpp:97
mqtt_callback::mqtt_callback
mqtt_callback(std::shared_ptr< mqtt::async_client > cli, mqtt::connect_options &connOpts, MqttExternalServer &server)
Definition: MqttExternalServer.cpp:112
mqtt_callback::connected
void connected(const std::string &cause) override
Definition: MqttExternalServer.cpp:75
Leosac::Module::Mqtt::MqttExternalServer::client_
std::shared_ptr< mqtt::async_client > client_
The MQTT client.
Definition: MqttExternalServer.hpp:95
Leosac::Module::Mqtt::MqttExternalServer
Implementation class, for use by the Mqtt module only.
Definition: MqttExternalServer.hpp:38
mqtt_callback
Definition: MqttExternalServer.cpp:33
Leosac::Auth::SourceType::SIMPLE_CSN
@ SIMPLE_CSN
A simple Card Serial Number.
mqtt_callback::connection_lost
void connection_lost(const std::string &cause) override
Definition: MqttExternalServer.cpp:85
MqttExternalServer.hpp
N_RETRY_ATTEMPTS
const int N_RETRY_ATTEMPTS
Definition: MqttExternalServer.cpp:31
Leosac::Module::Mqtt::MqttExternalServer::name
const std::string & name() const
Definition: MqttExternalServer.cpp:344
Leosac::Module::Mqtt::MqttExternalServer::topics
const std::vector< std::shared_ptr< const MqttExternalMessage > > & topics() const
Definition: MqttExternalServer.cpp:349
Leosac::Hardware::Device::name
const std::string & name() const
Definition: Device.cpp:55
Leosac::Module::Mqtt::MqttExternalServer::handle_topic_request
void handle_topic_request(const std::string &topic_name)
Request received on the topic specific socket.
Definition: MqttExternalServer.cpp:192
Leosac::Module::Mqtt::MqttExternalServer::topic_socks_
std::map< std::string, zmqpp::socket > topic_socks_
REP socket to receive command on for topics.
Definition: MqttExternalServer.hpp:90
Leosac::Module::Mqtt::MqttExternalServer::connect
void connect()
Definition: MqttExternalServer.cpp:158
Leosac::Module::Mqtt
Provide support for Mqtt.
Definition: MqttConfig.cpp:28
Leosac::Module::Mqtt::MqttExternalServer::config
std::shared_ptr< const MqttServerConfig > config() const
Definition: MqttExternalServer.cpp:354
log.hpp
Leosac::Hardware::ExternalMessage::virtualtype
DeviceClass virtualtype() const
Definition: ExternalMessage.cpp:55
mqtt_callback::on_success
void on_success(const mqtt::token &tok) override
Definition: MqttExternalServer.cpp:71
Leosac::Module::Mqtt::MqttExternalServer::MqttExternalServer
MqttExternalServer(zmqpp::context &ctx, std::shared_ptr< const MqttServerConfig > config, const std::vector< std::shared_ptr< const MqttExternalMessage >> &topics)
Definition: MqttExternalServer.cpp:118
Leosac::Module::Mqtt::MqttExternalServer::sock_
zmqpp::socket sock_
REP socket to receive command on.
Definition: MqttExternalServer.hpp:80
mqtt_callback::on_failure
void on_failure(const mqtt::token &tok) override
Definition: MqttExternalServer.cpp:58
Leosac::Hardware
Provides facade classes to hardware device implementation.
Definition: Buzzer.cpp:25
Leosac::Module::Mqtt::MqttExternalServer::disconnect
void disconnect()
Definition: MqttExternalServer.cpp:187
mqtt_callback::connOpts_
mqtt::connect_options & connOpts_
Definition: MqttExternalServer.cpp:41
mqtt_callback::reconnect
void reconnect()
Definition: MqttExternalServer.cpp:44
mqtt_callback::delivery_complete
void delivery_complete(mqtt::delivery_token_ptr tok) override
Definition: MqttExternalServer.cpp:107
QOS
const int QOS
Definition: MqttExternalServer.cpp:30
mqtt_callback::nretry_
int nretry_
Definition: MqttExternalServer.cpp:37