[zeromq-dev] ZeroMQ integration with Boost

Maciej Gajewski Maciej.Gajewski at tibra.com
Fri May 27 10:38:15 CEST 2011


I've managed to successfully observe 0MQ sockets with boost asio io_service in following way:

I. The file descriptor returned by getsockopt ZMQ_FD is a descriptor of internal stream socket used to send commands from io thread to. You can wrap it in boost stream socket.
Unfortunately boost stream socket closes the descriptor when destroyed, so I had to modify stream descriptor service in a way that it does not close the socket

    typedef boost::asio::posix::stream_descriptor_service BaseStreamService;

    /// boost asio service doing everytiong excepto of closing the socket
    /// (zmq is responsible for socket state management)
    template<typename BaseService>
    class NonClosingService : public BaseService
    {
    public:
        NonClosingService(boost::asio::io_service & io_service)
        : BaseService(io_service)
        {
        }

        /// Destroy a stream descriptor implementation.
        void destroy(typename BaseService::implementation_type& impl)
        {
            // do exactly nothing
        }
    };

    typedef NonClosingService<BaseStreamService> NonClosingStreamService;


    typedef boost::asio::posix::basic_stream_descriptor<NonClosingStreamService> StreamSocketWrapper;
    typedef boost::shared_ptr<StreamSocketWrapper> StreamSocketWrapperPtr;

II. Create ASIO socket wrapping ZMQ FD

    int fd = -1;
    std::size_t fdsize = sizeof(fd);
    zmqSocket->getsockopt(ZMQ_FD, &fd, &fdsize);


    StreamSocketWrapperPtr asioSocket(new StreamSocketWrapper(mIoService, fd));

III. Use ASIO to asynchrnously wait for socket to become readable

    typedef boost::function<void (const boost::shared_ptr<zmq::socket>&)> HandlerType;

    // this is required to to execute the funtion once at the beginning of event loop in case the internal FD is already readable (io thread send something). This is because
    // io_service poller is edge-triggered and will not react if the socket is already readable.
    asioIoService.post(
        boost::bind(
            OnStreamSocketReadable,
            asioSocket,
            boost::weak_ptr<zmq::socket>(zmqSocket),
            handler,
            boost::system::error_code()));


    // function called when socket is readable (and once at the beginning of the event loop), checking if ZMQ has incoming message and calling handler
    void OnStreamSocketReadable(
        const StreamSocketWrapperPtr& asioSocket,
        const boost::weak_ptr<zmq::socket>& weakSocket,
        const HandlerType& handler,
        const boost::system::error_code& error)
    {
        if (!error)
        {

            // handle it only if the socket still exists
            if (boost::shared_ptr<zmq::socket> zmqSocket = weakSocket.lock())
            {
                for(;;)
                {
                    // ok, so the underlying socket is readable. check if 0MQ socket is readable
                    boost::int32_t events = 0;
                    std::size_t eventsSize = sizeof(events);
                    zmqSocket->getsockopt(ZMQ_EVENTS, &events, &eventsSize);

                    if (events & ZMQ_POLLIN)
                    {
                        try
                        {
                            handler(zmqSocket);
                        }
                        catch(...)
                        {
                            // ignore all exception coming from the handler
                            LOG_ERROR("Stream socket handler threw!");
                        }
                        // note: we are in a loop, getsockopt(ZMQ_EVENTS) will be called again which is _essential_
                        // for the system to work properly.
                    }
                    else
                    {
                        // Stream socket readable, but 0MQ socket not, waiting for more data
                        break;
                    }
                }
                // re-insert the socket into asio's poller
                asioSocket->async_read_some(
                    boost::asio::null_buffers(),
                    boost::bind(
                        OnStreamSocketReadable,
                        asioSocket,
                        weakSocket,
                        handler,
                        _1));
            }
            else
            {
                // nothing. the 0MQ socket in question does not exists any more
            }
        }
        else
        {
            // nothing to do here, no error handling (yet?)
            LOG_DEBUG("Error on stream socket: ", error);
        }
    }



________________________________________
From: zeromq-dev-bounces at lists.zeromq.org [zeromq-dev-bounces at lists.zeromq.org] On Behalf Of Arthur Davis [silpertan+zmq at gmail.com]
Sent: 26 May 2011 06:43
To: zeromq-dev at lists.zeromq.org
Subject: [zeromq-dev] ZeroMQ integration with Boost

I noticed a discussion about integrating 0MQ with boost:
    http://boost.2283326.n4.nabble.com/0MQ-messaging-in-Boost-td2664202.html

I am working on a project that is using boost::asio facilities and 0MQ
sockets in the same process.  It would be nice if the
boost::asio::io_service features for handling async IO were able to
handle 0MQ sockets along with other sockets and timers.

Has any work been done in this area since the idea was tossed around
back in 2009?

Thanks,
Arthur
_______________________________________________
zeromq-dev mailing list
zeromq-dev at lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Tibra Trading Europe Limited is Authorised and Regulated by the FSA (Company No. 06061713)
Tibra Global Services Pty Limited London Branch (Company No. BR014370)
Tibra Equities Europe Limited (Company No. 07283912)
Registered in England & Wales - Level 11, 5 Aldermanbury Square London EC2V 7HR

The contents of this email including any attachments are confidential.  If you have received this email in error, please advise the sender by return email and delete this email.  Any unauthorised use of the contents of the email is prohibited and you must not disseminate, copy or distribute the message or use the information contained in the email or its attachments in any way.

The views or opinions expressed are the author's own and may not reflect the views or opinions of Tibra. Tibra does not guarantee the integrity of any emails or attached files. E-mails may be interfered with, may contain computer viruses or other defects. Under no circumstances do we accept liability for any loss or damage which may result from your receipt of this message or any attachments.



More information about the zeromq-dev mailing list