[zeromq-dev] Exiting a child thread

Patrick Boettcher patrick.boettcher at posteo.de
Tue Apr 17 16:16:36 CEST 2018


Hi again,

I was so happy that I could share the self-pipe-trick that I must have
overlooked the zmq_proxy-part of your description. Sorry for that, I
think my answer does not help you for your problem - at least this time.

regards,
--
Patrick.

On Tue, 17 Apr 2018 15:37:59 +0200
Patrick Boettcher <patrick.boettcher at posteo.de> wrote:

> Hi Giordano,
> 
> 
> On Tue, 17 Apr 2018 13:10:33 +0000
> "Cerizza, Giordano" <cerizza at nscl.msu.edu> wrote:
> 
> > Hi,
> > I have the following problem: I have a source of messages connected
> > to a proxy; the PULL/PUSH proxy reroutes the messages via inproc to
> > a multithreaded service that receives the messages and pushes them
> > to a destination sink. Here some pseudocode to help understand the
> > streaming class:
> > 
> > void main(){
> > // definition of context (ctx_) and sockets for PULL/PUSH proxy
> > (frontend,backend) (...)
> > pthread worker[N]
> > for loop for number of thread
> >    pthread_create(&worker[i], NULL, worker_task, (void*)ctx_)
> > try{
> >    zmq::proxy (*frontend, *backend, NULL);
> > } catch(...){
> >   // stuff
> > }
> > }  
> 
> (side-note: if you can, try using std::thread in C++)
>  
> > void* worker_task(void *arg){
> > zmq::context_t *context = (zmq::context_t*) arg;
> > // definition of the sockets to pull from (socket_from) and push to
> > (socket_to) (...)
> > // operations on messages (receive and send)
> > (...)
> > // exiting the worker_task
> > pthead_exit(NULL);
> > }
> > 
> > Observations: my system hangs before exiting the child thread. I
> > tried to close the sockets (zmq_close) and destruct (zmq_term) the
> > context for the child but nothing happens. How do I safely leave the
> > thread and move on with my code i.e. close the sockets and destroy
> > the context for the proxy, destruct the streaming class, and move to
> > operate on the message sink?  
> 
> There are several approaches to solve this problem, I'm using one,
> which is known since a long time (before ZMQ existed). It is done with
> a pipe - the mechanism is called self-pipe. In ZMQ I use inproc for
> that.
> 
> You create two ZMQ-PAIR sockets which bind/connect to the
> same inproc://-endpoint. In your thread you use poll on your
> data socket(s) and on one of the inproc-sockets. If you have an
> 'revent' on this socket, you clean up and exit your thread.
> 
> In the main-thread, when you want to terminate your program and thus
> your thread, you write "something" on the other inproc-socket and then
> you call pthread_join on your thread, knowing that it will terminate
> correctly very soon.
> 
> Here is some (pseudo)-code: Main-thread, creation and thread-start
> 
>   // create two sockets: 
>   std::string inprocEp = "inproc://#" +
> std::to_string(internalCount_.fetch_add(1));
> internal_[0]->bind(inprocEp); internal_[1]->connect(inprocEp);
> 
>   internal_[0]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
>   internal_[1]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
> 
>   receiverThread_ = std::thread(&classname::threadMethod, this);
> 
> In thread:
> 
>   while (1) {
>     zmq::pollitem_t items[] = {{*socket, 0, ZMQ_POLLOUT, 0},
>                                {*internal_[0], 0, ZMQ_POLLIN, 0}};
> 
>     auto ret = zmq::poll(items, 2, 5000); // 5 secs
>     // check ret 
>     [..]
>     // an event on internal socket -> shutdown requested
>     if (items[1].revents != 0) 
>       break;
>   }
> 
> In main-thread ending code:
> 
>   internal_[1]->send("HUP", 3);
>   receiverThread_.join();
> 
> The variables used here:
> 
>   static std::atomic<unsigned> internalCount_;
>   std::shared_ptr<zmq::socket_t> internal_[2];
> 
>   std::thread receiverThread_;
> 
> This code is using C++11-types, I hope you can use them as well.
> 
> HTH,
> --
> Patrick.
> 
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list