[zeromq-dev] Exiting a child thread

Patrick Boettcher patrick.boettcher at posteo.de
Tue Apr 17 15:37:59 CEST 2018


Hi Giordano,


On Tue, 17 Apr 2018 13:10:33 +0000
"Cerizza, Giordano" <cerizza at nscl.msu.edu> wrote:

> Hi,
> I have the following problem: I have a source of messages connected
> to a proxy; the PULL/PUSH proxy reroutes the messages via inproc to a
> multithreaded service that receives the messages and pushes them to a
> destination sink. Here some pseudocode to help understand the
> streaming class:
> 
> void main(){
> // definition of context (ctx_) and sockets for PULL/PUSH proxy
> (frontend,backend) (...)
> pthread worker[N]
> for loop for number of thread
>    pthread_create(&worker[i], NULL, worker_task, (void*)ctx_)
> try{
>    zmq::proxy (*frontend, *backend, NULL);
> } catch(...){
>   // stuff
> }
> }

(side-note: if you can, try using std::thread in C++)
 
> void* worker_task(void *arg){
> zmq::context_t *context = (zmq::context_t*) arg;
> // definition of the sockets to pull from (socket_from) and push to
> (socket_to) (...)
> // operations on messages (receive and send)
> (...)
> // exiting the worker_task
> pthead_exit(NULL);
> }
> 
> Observations: my system hangs before exiting the child thread. I
> tried to close the sockets (zmq_close) and destruct (zmq_term) the
> context for the child but nothing happens. How do I safely leave the
> thread and move on with my code i.e. close the sockets and destroy
> the context for the proxy, destruct the streaming class, and move to
> operate on the message sink?

There are several approaches to solve this problem, I'm using one,
which is known since a long time (before ZMQ existed). It is done with
a pipe - the mechanism is called self-pipe. In ZMQ I use inproc for
that.

You create two ZMQ-PAIR sockets which bind/connect to the
same inproc://-endpoint. In your thread you use poll on your
data socket(s) and on one of the inproc-sockets. If you have an 'revent'
on this socket, you clean up and exit your thread.

In the main-thread, when you want to terminate your program and thus
your thread, you write "something" on the other inproc-socket and then
you call pthread_join on your thread, knowing that it will terminate
correctly very soon.

Here is some (pseudo)-code: Main-thread, creation and thread-start

  // create two sockets: 
  std::string inprocEp = "inproc://#" + std::to_string(internalCount_.fetch_add(1));
  internal_[0]->bind(inprocEp); 
  internal_[1]->connect(inprocEp);

  internal_[0]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
  internal_[1]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));

  receiverThread_ = std::thread(&classname::threadMethod, this);

In thread:

  while (1) {
    zmq::pollitem_t items[] = {{*socket, 0, ZMQ_POLLOUT, 0},
                               {*internal_[0], 0, ZMQ_POLLIN, 0}};

    auto ret = zmq::poll(items, 2, 5000); // 5 secs
    // check ret 
    [..]
    // an event on internal socket -> shutdown requested
    if (items[1].revents != 0) 
      break;
  }

In main-thread ending code:

  internal_[1]->send("HUP", 3);
  receiverThread_.join();

The variables used here:

  static std::atomic<unsigned> internalCount_;
  std::shared_ptr<zmq::socket_t> internal_[2];

  std::thread receiverThread_;

This code is using C++11-types, I hope you can use them as well.

HTH,
--
Patrick.




More information about the zeromq-dev mailing list