[zeromq-dev] Pub/sub for a logger and multiprocessing

Frédéric fma at gbiloba.org
Fri Jul 22 18:01:17 CEST 2016


(sorry for the truncated mail!)

Hi!

I'm new to zeromq. I plan to use it for my multi-legs robot python
framework (Py4bot¹).

As a first exercise, 

I wrote a custom logger which is a singleton, and shared accross the
application. As I'm switching from threads to processes, I'm trying to use
the zmq logger handler PUBHandler, to be able to remotly receive logs from
all processes, but it does not work out-of-the box in my case...

In the the logger __init__() method, I create a sub-process and
instanciate a zmq forwarder device, as explained here:

https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/devices/forwarder.html

so this device is only created once, at the first call to the logger.

Then, in my app, I create sub-processes using python
multiprocessing.Process class. but I get a zmq error:

ZMQError: Address already in use

I don't understand what's going on. could somebody point me in a right
direction? Here is my sample code:

---------------------------------------

import time
import multiprocessing
import logging

import zmq
import zmq.utils.strtypes

FRONTEND_PORT = 5559
BACKEND_PORT = 5560


class Singleton(type):
    def __init__(self, *args, **kwargs):
        super(Singleton, self).__init__(*args, **kwargs)
        self._instance = None

    def __call__(self, *args, **kwargs):
        if self._instance is None:
            self._instance = super(Singleton, self).__call__(*args,
    **kwargs)

        return self._instance


class ZmqPubHandler(logging.Handler):
    def __init__(self):
        super(ZmqPubHandler, self).__init__()
        context = zmq.Context.instance()
        self._socket = context.socket(zmq.PUB)
        self._socket.connect("tcp://localhost:%d" % FRONTEND_PORT)

    def emit(self, record):
        try:
            bmsg = zmq.utils.strtypes.cast_bytes(self.format(record))
        except Exception:
            self.handleError(record)
            return

        self._socket.send_multipart(("logger", bmsg))


class Log(object):
    __metaclass__ = Singleton

    def __init__(self):
        super(Log, self).__init__()

        print "init Log"
        self._logger = logging.getLogger("test")
        handler = ZmqPubHandler()
        self._logger.addHandler(handler)

        multiprocessing.Process(target=self._device).start()

    def _device(self):
        context = zmq.Context.instance()

        # Socket facing clients
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://*:%d" % FRONTEND_PORT)

        frontend.setsockopt(zmq.SUBSCRIBE, "")

        # Socket facing services
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://*:%d" % BACKEND_PORT)

        zmq.device(zmq.FORWARDER, frontend, backend)  # blocking call

    def debug(self, *args, **kwargs):
        self._logger.debug(*args, **kwargs)


class A:
    def loop(self):
        while True:
            print "A", time.time()
            Log().debug(time.time())
            time.sleep(1)

class B:
    def loop(self):
        while True:
            print "B", time.time()
            Log().debug(time.time())
            time.sleep(1)


if __name__ == "__main__":
    print "launch A..."
    multiprocessing.Process(target=A().loop).start()
    print "launch B..."
    multiprocessing.Process(target=B().loop).start()

    while True:
        time.sleep(0.001)

    print "end"

------------------------------------------------------

I also found some special python objects, Device/Proxy/ProcessDevice...,
but I don't see how to use them.

Thanks for your help!

Best,

¹ http://www.py4bot.org

-- 
    Frédéric



More information about the zeromq-dev mailing list