Leosac  0.8.0
Open Source Access Control
MessageBus.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2014-2016 Leosac
3 
4  This file is part of Leosac.
5 
6  Leosac is free software: you can redistribute it and/or modify
7  it under the terms of the GNU Affero General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  Leosac is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU Affero General Public License for more details.
15 
16  You should have received a copy of the GNU Affero General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "MessageBus.hpp"
21 #include "tools/ThreadUtils.hpp"
22 
23 MessageBus::MessageBus(zmqpp::context &ctx)
24  : ctx_(ctx)
25  , pub_(nullptr)
26  , pull_(nullptr)
27  , running_(true)
28 {
29  actor_ =
30  new zmqpp::actor(std::bind(&MessageBus::run, this, std::placeholders::_1));
31 }
32 
34 {
35  delete actor_;
36 }
37 
38 bool MessageBus::run(zmqpp::socket *pipe)
39 {
40  Leosac::set_thread_name("message_bus");
41  try
42  {
43  pub_ = new zmqpp::socket(ctx_, zmqpp::socket_type::pub);
44  pub_->bind("inproc://zmq-bus-pub");
45 
46  pull_ = new zmqpp::socket(ctx_, zmqpp::socket_type::pull);
47  pull_->bind("inproc://zmq-bus-pull");
48  }
49  catch (std::exception &e)
50  {
51  return false;
52  }
53 
54  pipe->send(zmqpp::signal::ok);
55 
56  zmqpp::reactor reactor;
57 
58  reactor.add(*pull_, std::bind(&MessageBus::handle_pull, this));
59  reactor.add(*pipe, std::bind(&MessageBus::handle_pipe, this, pipe));
60 
61  while (running_)
62  {
63  reactor.poll();
64  }
65  delete pull_;
66  delete pub_;
67  return true;
68 }
69 
70 void MessageBus::handle_pipe(zmqpp::socket *pipe)
71 {
72  zmqpp::signal sig;
73  pipe->receive(sig);
74 
75  if (sig == zmqpp::signal::stop)
76  {
77  running_ = false;
78  }
79 }
80 
82 {
83  zmqpp::message msg;
84 
85  pull_->receive(msg);
86  pub_->send(msg);
87 }
MessageBus::running_
bool running_
Definition: MessageBus.hpp:53
MessageBus::run
bool run(zmqpp::socket *pipe)
The method that will be run in the child thread.
Definition: MessageBus.cpp:38
Leosac::set_thread_name
void set_thread_name(const std::string &name)
Set the name of the current thread.
Definition: ThreadUtils.cpp:37
ThreadUtils.hpp
MessageBus.hpp
MessageBus::handle_pull
void handle_pull()
Definition: MessageBus.cpp:81
MessageBus::handle_pipe
void handle_pipe(zmqpp::socket *pipe)
Definition: MessageBus.cpp:70
MessageBus::MessageBus
MessageBus(zmqpp::context &ctx)
Definition: MessageBus.cpp:23
MessageBus::pull_
zmqpp::socket * pull_
Definition: MessageBus.hpp:47
MessageBus::pub_
zmqpp::socket * pub_
Definition: MessageBus.hpp:46
MessageBus::ctx_
zmqpp::context & ctx_
Definition: MessageBus.hpp:44
MessageBus::actor_
zmqpp::actor * actor_
Definition: MessageBus.hpp:36
MessageBus::~MessageBus
~MessageBus()
Definition: MessageBus.cpp:33