[zeromq-dev] Understanding binding

Robin Weisberg robin.weisberg at gmail.com
Tue Jul 28 20:24:59 CEST 2009


I'm running into a few issues trying to figure out how the interface related
parameters create_exchange/queue and bind works together. I've attached 2
programs I've used to help illustrate the confusion. I think it would be
great to have a version of these (these may have some issues)  in the zeromq
release as they are helpful for debugging as well as understanding whats
going on. Also could be used for simple shell scripting.

The programs are
zmqsend:
zmqlisten [-q <queue_interface> | -e <exch_interface>] <name server> <topic>

zmqlisten:
Usage: zmqlisten [-q <queue_interface> | -e <exch_interface>] <name server>
<topic> <data>

"topic" is the queue or exchange name depending depending on whether you
pass in -q or -e
"data" is a string that is sent multiple times in between 1 second intervals
from sender to listener (times and interval really should be options).
if you use -q the zmqlisten program creates a global queue (and must be
started first) and the zmqsend program can connect to it.
if you use -e the zmqsend program creates a global exchange (and must be
started first) and the zmqsend program can connect to it.



So here are the results that confuse me.

*1a) Using TCP*
on host A run:
./zmqsend -q "dummy" dev01:5678 "ZZZ" "hello world"
on host B run:
 ./zmqlisten -q "zmq.tcp://eth0:7501" "dev01:5678" "ZZZ"
Now stop the zmqlisten on host B and run w/ a different queue name
./zmqlisten -q "zmq.tcp://eth0:7501" "dev01:5678" "YYYY"
Now on host A run the same command as before:
./zmqsend -q "dummy" dev01:5678 "ZZZ" "hello world"

Host B still receives the messages eventhough Host A is publishing w/ a
different queue name?


*1b) Same issue using pgm*
on hast A run:
./zmqsend -e "zmq.pgm://eth0;226.0.0.130:7500" dev01:5678 "R1" "hello world"
on host B run:
./zmqlisten -e "eth0*;*226.0.0.130:7500" "dev01:5678" "R1"
on host A run
./zmqsend -e "zmq.pgm://eth0;226.0.0.130:7500" dev01:5678 "R2" "hello world"

Again the exchange has changed bug listener still is getting the messages.

*2) I can't seem to register a global queue using PGM*. I've tried:
./zmqlisten -q "zmq.pgm://eth0;226.0.0.130:7500" "dev01:5678" "R8"
interface to connect: zmq.pgm://eth0;226.0.0.130:7500; Listening to global
queue [R8]
zmqlisten: ./zmq/engine_base.hpp:77: void zmq::engine_base_t<HAS_IN,
HAS_OUT>::send_to(zmq::pipe_t*) [with bool HAS_IN = false, bool HAS_OUT =
true]: Assertion `HAS_IN' failed.
Aborted


Thanks!
Robin
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20090728/e96c3876/attachment.htm>
-------------- next part --------------
#include <stdio.h>
#include <zmq.hpp>

using namespace std;

int main (int argc, char *argv [])
{
    if (argc != 6) {
        printf("Usage: zmqlisten [-q <queue_interface> | -e <exch_interface>] <name server> <topic> <data>\n");
        return 1;
    }

    //  Input arguments parsing.
    bool useGlobalQueue = false;
    if (strcmp(argv[1], "-q") == 0) {
        useGlobalQueue = true;
    }
    else if (strcmp(argv[1], "-e") == 0) {
        useGlobalQueue = false;
    }
    else {
        printf("Must specify -q or -e option\n");
        return 1;
    }

    char interface[256];
    if (useGlobalQueue) {
        strncpy(interface, argv[2], sizeof(interface));
        interface[sizeof(interface) - 1] = 0;
    }
    else {
        zmq_snprintf(interface, sizeof(interface), "%s", argv [2]);
        interface[sizeof(interface) - 1] = 0;
    }

    const char *host = argv [3];

    //  Global exchange name.
    const char *topic = argv[4];

    const char* data = argv[5];

    //  Create locator.
    zmq::locator_t locator (host);

    //  Create dispatcher.
    zmq::dispatcher_t dispatcher(2);

    //  Create IO thread.
    zmq::i_thread *worker = zmq::io_thread_t::create (&dispatcher);

    //  Create api thread.
    zmq::api_thread_t *api = zmq::api_thread_t::create (&dispatcher, &locator);

    int ex_id = 0;
    if (useGlobalQueue) {
        printf("interface to connect: %s; Sending on global queue [%s]\n", interface, topic);
        ex_id = api->create_exchange ("L_EXCH");
        api->bind("L_EXCH", topic, worker, worker, interface);
    }
    else {
        printf("interface to connect: %s; Sending on global exchange [%s]\n", interface, topic);
        //  Create global uplink exchange.
        ex_id = api->create_exchange (topic, zmq::scope_global,
            interface, worker, 1, &worker);
    }

    for (int i = 0; i < 10; i++) {
        sleep(1);
        zmq::message_t message_out(strlen(data));
        memcpy(message_out.data(), data, strlen(data));
        char x[5];
        snprintf(x, 5, "%d", i);
        memcpy(message_out.data(), x, strlen(x));
        api->send(ex_id, message_out, true);
//    message_out.rebuild(strlen(data));
//    memcpy(message_out.data(), data, strlen(data));
//    printf("data after send [%.*s]\n", (int)message_out.size(), (char*)message_out.data());
//    api->send(ex_id, message_out, true);
    }

    sleep(1);
    return 0;
}
-------------- next part --------------
#include <stdio.h>
#include <zmq.hpp>

using namespace std;

bool onMsgError(const char* error) {
    printf("error: %s\n", error);
    return true;
}

int main (int argc, char *argv [])
{
    if (argc != 5) {
        printf("Usage: zmqlisten [-q <queue_interface> | -e <exch_interface>] <name server> <topic>\n");
        return 1;
    }

    bool useGlobalQueue = false;
    if (strcmp(argv[1], "-q") == 0) {
        useGlobalQueue = true;
    }
    else if (strcmp(argv[1], "-e") == 0) {
        useGlobalQueue = false;
    }
    else {
        printf("Must specify -q or -e option\n");
        return 1;
    }

    const char* interface = argv[2];

    //  Input arguments parsing.
    const char *host = argv [3];

    //  Global exchange name.
    const char *topic = argv[4];

    //  Create dispatcher.
    zmq::dispatcher_t dispatcher(2);
    //  Create IO thread.
    zmq::i_thread *worker = zmq::io_thread_t::create (&dispatcher);
    //  Create locator.
    zmq::locator_t locator (host);
    //  Create api thread.
    zmq::api_thread_t *api = zmq::api_thread_t::create (&dispatcher, &locator);
    api->mask (zmq::message_gap);
    zmq::set_error_handler(onMsgError);

    if (useGlobalQueue) {
        printf("interface to connect: %s; Listening to global queue [%s]\n", interface, topic);
        api->create_queue (topic, zmq::scope_global, interface, worker, 1, &worker);
    }
    else {
        printf("interface to connect: %s; Listening to global exch [%s]\n", interface, topic);
        //  Local queue name.
        char q_name [] = "L_QUEUE";
        //  Create local queue.
        api->create_queue(q_name);
        //  Bind local queue to global exchange.
        api->bind (topic, q_name, worker, worker, interface);
    }

    while (true) {
        zmq::message_t message;
        int queueID = api->receive (&message, true);
        if (message.type() == zmq::message_gap) {
            printf("Message gap\n");
        }
        else {
            char* data = (char*)message.data();
            printf("QueueID=%d, DATA[%.*s]\n", queueID, (int)message.size(), data);
        }
    }

    return 0;
}


More information about the zeromq-dev mailing list