[zeromq-dev] Is it possible to poll the input/output of a specific peer connected to the router?

jason.liuxi at gmail.com jason.liuxi at gmail.com
Wed May 27 15:01:00 CEST 2015


Hi all:
    Is it possible to poll the input/output of a specific peer connected to the router?
    For example, I got two dealer as clients, one is fast, another is slow, they all connected to a router. The router send message to both of them,  the slow dealer's input queue will soon be blocked due to hwm, I set the zmq_router_mandatory on the router, so the router will not discard any message.If I poll on the output of the router, it will always be available for writing  because  the existence of the fast client. Is it possible to just poll on the output of the slow peer?
    I know I could just let the router discard the message and let the slow client detect message missing and then do a reconnect or something, but what I want is to
hold the sending process when one peer is blocked, and when the slow peer recovers, the router continue to send message to it from where the slow client was blocked. Is is possible?
    below is some code to illustrate what I mean.

#include <zmq.h>
#include <assert.h>
#include <thread>
#include <map>
#include <string>
#include <iostream>
#include <mutex>

void* ctx;
void* router;
void* slow_dealer;
void* fast_dealer;

std::string addr = "inproc://hi";
std::string slow_id = "slow";
std::string fast_id = "fast";

std::mutex mut;

void client(const std::string& id, void* dealer, int milliseconds)
{
int index;

while (zmq_recv(dealer, &index, sizeof(index), 0))
{
{
std::unique_lock<std::mutex> lock(mut);
std::cout << id << " receive" << index << std::endl;
}
Sleep(milliseconds);
}
}

int main()
{
ctx = zmq_ctx_new();
assert(ctx);

router = zmq_socket(ctx, ZMQ_ROUTER);
assert(router);

int ret;

int mandatory = 1;
ret = zmq_setsockopt(router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof(mandatory));
assert(ret == 0);

int snd_hwm = 1;
ret = zmq_setsockopt(router, ZMQ_SNDHWM, &snd_hwm, sizeof(snd_hwm));
assert(ret == 0);

ret = zmq_bind(router, "inproc://hi");



{
slow_dealer = zmq_socket(ctx, ZMQ_DEALER);
assert(slow_dealer);

ret = zmq_setsockopt(slow_dealer, ZMQ_IDENTITY, slow_id.data(), slow_id.size());

int rcv_hwm = 1;
ret = zmq_setsockopt(slow_dealer, ZMQ_RCVHWM, &rcv_hwm, sizeof(rcv_hwm));
assert(ret == 0);

ret = zmq_connect(slow_dealer, "inproc://hi");
assert(ret == 0);

ret = zmq_send(slow_dealer, "Hi", 2, 0);
assert(ret == 2);
}

{
fast_dealer = zmq_socket(ctx, ZMQ_DEALER);
assert(fast_dealer);

ret = zmq_setsockopt(fast_dealer, ZMQ_IDENTITY, fast_id.data(), fast_id.size());

int rcv_hwm = 1;
ret = zmq_setsockopt(fast_dealer, ZMQ_RCVHWM, &rcv_hwm, sizeof(rcv_hwm));
assert(ret == 0);

ret = zmq_connect(fast_dealer, "inproc://hi");
assert(ret == 0);

ret = zmq_send(fast_dealer, "Hi", 2, 0);
assert(ret == 2);
}

std::map<std::string, int> client_map;
while (client_map.size() < 2)
{
char buf[128];
ret = zmq_recv(router, buf, 128, 0);
assert(ret > 0);
client_map[std::string(buf, ret)] = 0;

ret = zmq_recv(router, buf, 128, 0);
assert(ret > 0);
}

std::thread slow_client(std::bind(&client, slow_id, slow_dealer, 10000));
std::thread fast_client(std::bind(&client, fast_id, fast_dealer, 0));

zmq_pollitem_t items[1];
items[0].socket = router;
items[0].events = ZMQ_POLLOUT;

int max_index = 100;


while (true)
{
ret = zmq_poll(items, 1, -1);
if (items[0].revents & ZMQ_POLLOUT)
{
for (auto& iter : client_map)
{
if (iter.second < max_index)
{
const std::string& client_id = iter.first;
int& index = iter.second;

ret = zmq_send(router, client_id.data(), client_id.size(), ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (ret == -1 && zmq_errno() == EAGAIN)
continue;
assert(ret > 0);
ret = zmq_send(router, &index, sizeof(index), ZMQ_DONTWAIT);
assert(ret > 0);

index++;
}
else
{
std::unique_lock<std::mutex> lock(mut);
std::cout << " router unnessesary wakeup" << std::endl;
}
}
}
}
}


jason.liuxi at gmail.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150527/3ce7a620/attachment.htm>


More information about the zeromq-dev mailing list