29 #include <boost/archive/binary_iarchive.hpp>
30 #include <boost/archive/text_iarchive.hpp>
31 #include <boost/archive/text_oarchive.hpp>
32 #include <boost/property_tree/ptree_serialization.hpp>
33 #include <boost/regex.hpp>
35 #include <zmqpp/curve.hpp>
40 const boost::property_tree::ptree &cfg)
42 , socket_(ctx,
zmqpp::socket_type::router)
47 auth_.configure_curve(
"CURVE_ALLOW_ANY");
53 int port = cfg.get<
int>(
"port");
58 INFO(
"Enabling Remote Control:"
60 <<
"Port: " << port <<
"\n\t "
66 std::placeholders::_2);
70 std::placeholders::_2);
74 std::placeholders::_2);
78 std::placeholders::_2);
82 std::placeholders::_2);
86 std::placeholders::_2);
88 socket_.set(zmqpp::socket_option::curve_server,
true);
91 socket_.bind(
"tcp://*:" + std::to_string(port));
101 assert(msg.parts() > 1);
106 msg.reset_read_cursor();
109 DEBUG(
"Remote Control command: " << frame1 <<
" with " << msg.parts()
112 std::string user_pubkey;
113 bool ret = msg.get_property(
"User-Id", user_pubkey);
128 assert(rep.parts() == 1);
130 <<
"Malformed message: " << frame1;
131 WARN(
"Received malformed message on Remote Control Interface. "
138 WARN(
"Request denied. Insuficient permission for user "
139 << user_pubkey <<
". Command was: " << frame1);
141 <<
"Insuficient permission";
148 WARN(
"Unknown message on Remote Control interface");
165 zmqpp::message *message_out)
170 std::vector<std::string> modules_names =
172 if (std::find(modules_names.begin(), modules_names.end(), module) !=
175 zmqpp::socket sock(
context_, zmqpp::socket_type::req);
176 sock.connect(
"inproc://module-" + module);
178 bool ret = sock.send(zmqpp::message() <<
"DUMP_CONFIG" << cfg_format);
184 *message_out <<
"OK";
185 *message_out << module;
189 while (rep.remaining())
199 ERROR(
"RemoteControl: Cannot retrieve local module configuration for {"
201 <<
"The module appears to not be loaded.");
203 <<
"Module not loaded, so config not available";
208 zmqpp::message *msg_out)
216 std::ostringstream oss;
217 boost::archive::text_oarchive archive(oss);
218 boost::property_tree::save(archive, cfg, 1);
220 msg_out->add(oss.str());
230 zmqpp::message *msg_out)
235 if (msg_in->remaining() >= 2)
237 std::string module_name;
239 *msg_in >> module_name >>
format;
247 zmqpp::message *msg_out)
252 if (msg_in->remaining() == 0)
267 if (msg_in->remaining() == 4)
269 std::string endpoint;
270 std::string remote_server_pubkey;
271 uint8_t sync_general_config;
274 *msg_in >> autocommit;
275 *msg_in >> remote_server_pubkey;
276 *msg_in >> sync_general_config;
279 auto fetch_task = std::make_shared<Tasks::FetchRemoteConfig>(
280 endpoint, remote_server_pubkey);
282 auto sync_task = std::make_shared<Tasks::SyncConfig>(
283 kernel_, fetch_task, sync_general_config, autocommit);
286 auto success_response_task =
287 std::make_shared<Tasks::RemoteControlAsyncResponse>(
289 zmqpp::message() <<
"Success" << sync_task->get_guid(),
socket_);
291 auto failure_response_task =
292 std::make_shared<Tasks::RemoteControlAsyncResponse>(
294 zmqpp::message() <<
"Failed" << sync_task->get_guid(),
socket_);
296 auto abort_response_task =
297 std::make_shared<Tasks::RemoteControlAsyncResponse>(
299 zmqpp::message() <<
"Aborted" << sync_task->get_guid(),
socket_);
301 sync_task->set_on_success([=]() {
302 success_response_task->run();
303 ASSERT_LOG(success_response_task->succeed(),
"TASK FAILED");
305 sync_task->set_on_failure([=]() {
306 failure_response_task->run();
307 ASSERT_LOG(failure_response_task->succeed(),
"TASK FAILED");
309 fetch_task->set_on_failure([=]() {
314 fetch_task->set_on_success([=]() {
315 DEBUG(
"FETCH TASK COMPLETE. WILL QUEUE SYNC_CONFIG");
321 *msg_out <<
"DELAYED" << sync_task->get_guid();
332 if (msg_in->remaining() == 0)
338 <<
"Saving config failed for some unkown reason.";
346 zmqpp::message *msg_out)
351 if (msg_in->remaining() == 1)
362 zmqpp::message *msg_out)
367 if (msg_in->remaining() == 0)