[zeromq-dev] Problems with PUB/SUB sockets and OpenPGM

Jens Mehnert jens.mehnert at smeet.com
Wed Jun 30 11:44:44 CEST 2010


I'm currently working on an embedded broker running within our different 
services to allow to share messages between
the different parts of our system. I'm using a producer and a consumer 
socket connecting to two different ports on a
forwarder with following configuration:

<forwarder>
<in>
<bind addr = "tcp://127.0.0.1:5555"/>
</in>
<out>
<bind addr = "tcp://127.0.0.1:5556"/>
</out>
</forwarder>

The forwarder solution is only for some of our developer systems 
(Windows, MacOS). In other environments the
producers and consumers should send and receive the messages via 
broadcast (OpenPGM).

The following code shows that I'm using two contexts. I need to do it 
that way because otherwise the send thread
is blocking. An this leads to my first question:  Why I need to use two 
contexts here respectively why is the code
blocking if I'm only using one context? So, here is the shortened code 
of a first prototype:

public class ZmqBroker {

     private static final FmtLogger LOG = 
FmtLogger.getLogger(ZmqBroker.class);

     private final ExecutorService _executorService = 
ExecutorServiceFactory.newFixedThreadPool("ZmqDispatcher", 1);

     private Marshaller _marshaller = new MarshallerJavaSerialization();
     private static final ZMQ.Context PRODUCER_CONTEXT = ZMQ.context (1);
     private static final ZMQ.Context CONSUMER_CONTEXT = ZMQ.context (1);
     private static final ZMQ.Socket PRODUCER_SOCKET = 
PRODUCER_CONTEXT.socket(ZMQ.PUB);
     private static final ZMQ.Socket CONSUMER_SOCKET = 
PRODUCER_CONTEXT.socket(ZMQ.SUB);

     public void init() {
         PRODUCER_SOCKET.connect("tcp://127.0.0.1:5555");
         CONSUMER_SOCKET.setsockopt(ZMQ.SUBSCRIBE, "");
         CONSUMER_SOCKET.connect("tcp://127.0.0.1:5556");
         _executorService.execute(new Dispatcher(new ZmqMessageHandler()));
     }

     public synchronized void send(Message message) {
         byte[] data = 
_marshaller.marshal(ZmqMessageFactory.getZmqTransport(message));
         PRODUCER_SOCKET.send(data, 0);
     }

     private static class Dispatcher implements Runnable {

         private final Marshaller _marshaller = new 
MarshallerJavaSerialization();
         private final MessageHandler<Message> _handler;

         private Dispatcher(MessageHandler<Message> handler) {
             _marshaller = marshaller;
             _handler = handler;
         }

         public void run() {
             while (!Thread.currentThread().isInterrupted()) {
                 try {
                     byte[] rawMessage = CONSUMER_SOCKET.recv(0);
                     Message message = _marshaller.unmarshal(rawMessage);
                     _handler.handleMessage(message);
                 } catch (Exception e) {
                     LOG.error("An error occured handling zmq message ...");
                 }
             }
         }

     }
}

The second problem I've is that if I'm using two contexts I'm running 
into troubles when I'm changing
the protocol to epgm. Because in that case I'm getting following error: 
Assertion failed: !pgm_supported () (zmq.cpp:239)

Best regards, Jens
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100630/e72cbd9e/attachment.html>


More information about the zeromq-dev mailing list