[zeromq-dev] Exiting a child thread
Patrick Boettcher
patrick.boettcher at posteo.de
Tue Apr 17 16:16:36 CEST 2018
Hi again,
I was so happy that I could share the self-pipe-trick that I must have
overlooked the zmq_proxy-part of your description. Sorry for that, I
think my answer does not help you for your problem - at least this time.
regards,
--
Patrick.
On Tue, 17 Apr 2018 15:37:59 +0200
Patrick Boettcher <patrick.boettcher at posteo.de> wrote:
> Hi Giordano,
>
>
> On Tue, 17 Apr 2018 13:10:33 +0000
> "Cerizza, Giordano" <cerizza at nscl.msu.edu> wrote:
>
> > Hi,
> > I have the following problem: I have a source of messages connected
> > to a proxy; the PULL/PUSH proxy reroutes the messages via inproc to
> > a multithreaded service that receives the messages and pushes them
> > to a destination sink. Here some pseudocode to help understand the
> > streaming class:
> >
> > void main(){
> > // definition of context (ctx_) and sockets for PULL/PUSH proxy
> > (frontend,backend) (...)
> > pthread worker[N]
> > for loop for number of thread
> > pthread_create(&worker[i], NULL, worker_task, (void*)ctx_)
> > try{
> > zmq::proxy (*frontend, *backend, NULL);
> > } catch(...){
> > // stuff
> > }
> > }
>
> (side-note: if you can, try using std::thread in C++)
>
> > void* worker_task(void *arg){
> > zmq::context_t *context = (zmq::context_t*) arg;
> > // definition of the sockets to pull from (socket_from) and push to
> > (socket_to) (...)
> > // operations on messages (receive and send)
> > (...)
> > // exiting the worker_task
> > pthead_exit(NULL);
> > }
> >
> > Observations: my system hangs before exiting the child thread. I
> > tried to close the sockets (zmq_close) and destruct (zmq_term) the
> > context for the child but nothing happens. How do I safely leave the
> > thread and move on with my code i.e. close the sockets and destroy
> > the context for the proxy, destruct the streaming class, and move to
> > operate on the message sink?
>
> There are several approaches to solve this problem, I'm using one,
> which is known since a long time (before ZMQ existed). It is done with
> a pipe - the mechanism is called self-pipe. In ZMQ I use inproc for
> that.
>
> You create two ZMQ-PAIR sockets which bind/connect to the
> same inproc://-endpoint. In your thread you use poll on your
> data socket(s) and on one of the inproc-sockets. If you have an
> 'revent' on this socket, you clean up and exit your thread.
>
> In the main-thread, when you want to terminate your program and thus
> your thread, you write "something" on the other inproc-socket and then
> you call pthread_join on your thread, knowing that it will terminate
> correctly very soon.
>
> Here is some (pseudo)-code: Main-thread, creation and thread-start
>
> // create two sockets:
> std::string inprocEp = "inproc://#" +
> std::to_string(internalCount_.fetch_add(1));
> internal_[0]->bind(inprocEp); internal_[1]->connect(inprocEp);
>
> internal_[0]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
> internal_[1]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
>
> receiverThread_ = std::thread(&classname::threadMethod, this);
>
> In thread:
>
> while (1) {
> zmq::pollitem_t items[] = {{*socket, 0, ZMQ_POLLOUT, 0},
> {*internal_[0], 0, ZMQ_POLLIN, 0}};
>
> auto ret = zmq::poll(items, 2, 5000); // 5 secs
> // check ret
> [..]
> // an event on internal socket -> shutdown requested
> if (items[1].revents != 0)
> break;
> }
>
> In main-thread ending code:
>
> internal_[1]->send("HUP", 3);
> receiverThread_.join();
>
> The variables used here:
>
> static std::atomic<unsigned> internalCount_;
> std::shared_ptr<zmq::socket_t> internal_[2];
>
> std::thread receiverThread_;
>
> This code is using C++11-types, I hope you can use them as well.
>
> HTH,
> --
> Patrick.
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list