[zeromq-dev] Regarding UDP engine and RAW sockets

Leonardo José Consoni consoni_2519 at hotmail.com
Sat May 14 08:19:26 CEST 2016


Hello everyone.

I'm investigating the possibility to add UDP raw (datagram) sockets to ZeroMQ, for compatibility with applications that use the BSD Sockets API directly, replicating what is done for TCP raw (stream) sockets already, in combination with the recently added UDP engine.

I opened a pull request for it, but I'm still tweaking things while diving through the code base and learning how it works: https://github.com/zeromq/libzmq/pull/1986

According to the examples, communication with raw sockets works by sending or receiving 2 messages: the first containing a remote peer identifier and the second the actual data transmitted. So the user has to manually handle it with SENDMORE and RECVMORE flags.

For now, I'm trying to mimic that for dgram_t (new class I set) by copying the IPv4 address structure (sockaddr_in) to the peer identifier, so that we can acquire an unique identifier with a recvfrom() call, and use the same value to know where to send the message buffer with sendto(), at the UDP engine level. The higher level logic (dgram_t::xsend and dgram_t::xrecv) was simply copied from stream_t.

On the other side, socket types like radio and dish, even if they use identifiers/groups as well, do it with a single message, by putting content data and group data on different fields (accessed with zmq_msg_data() and zmq_msg_*group() functions).

Looking in msg.hpp I can see that messages have a union of structs to handle different types of messages, and all of them contain the fields

struct {
    ...
    char group [16];                        // I guess that radio/dish, pub/sub use this
    uint32_t routing_id;                    // I guess that router/dealer uses this
};

that wouldn't be used for ZMQ_STREAM and ZMQ_DGRAM messages, if we handle everything as content data, leaving 20 bytes unused in the message.

Why don't raw stream sockets use this available memory to store a peer identifier, avoiding the need for manually sending two messages ?

In the case of datagrams, wouldn't be a good idea to add a new struct to this union with a field **unsigned char address[20]** to store the address info ? According to the Beej's guide (http://beej.us/guide/bgnet/output/html/multipage/ipstructsdata.html), the structure

    struct sockaddr_in {
        short int          sin_family;  // Address family, AF_INET
        unsigned short int sin_port;    // Port number
        struct in_addr     sin_addr;    // Internet address (32 bits)
        unsigned char      sin_zero[8]; // Same size as struct sockaddr
    };

has **2 + 2 + 4 + 8 = 16** bytes of data, which fits the available memory.

The downside of it is that we couldn't store IPv6 addresses

    struct sockaddr_in6 {
        u_int16_t       sin6_family;   // address family, AF_INET6
        u_int16_t       sin6_port;     // port number, Network Byte Order
        u_int32_t       sin6_flowinfo; // IPv6 flow information
        struct in6_addr sin6_addr;     // IPv6 address
        u_int32_t       sin6_scope_id; // Scope ID
    };

    struct in6_addr {
        unsigned char   s6_addr[16];   // IPv6 address
    };

as they need **2 + 2 + 4 + 16 + 4 = 28** bytes. But maybe we could create a common structure to store both addresses info, ignore the **sin6_flowinfo** and **sin6_scope_id** fields (storing exactly 20 bytes),  and rebuild the sockaddr_in* structures on demand:

    typedef struct socket_address {
        uint16_t family;                          // stores sin_family or sin6_family (or ZMQ_IPV4 or ZMQ_IPV6)
        uint16_t port;                             // stores sin_port or sin6_port
        unsigned char host[16];             // stores sin_addr (first 4 bytes) or sin6_addr
    } socket_address_t;

Obviously, as there is zmq_msg_*routing_id() and zmq_msg_*group() functions, we would need zmq_msg_*address() as well.

Or, as @hitstergtd suggested on the github issue, more direct and intuitive zmq_msg_send_to() and zmq_msg_recv_from() calls. Aside from socket, message and flags, these calls would receive, as parameters, a opaque void* pointer to the address (so that it could be used to point to ZMQ_STREAM sockets peer identifiers). With value NULL for send_to(), the message would be sent to the address the socket was connected/bound to (could be useful for multicast).

Taking a better look at udp_engine.cpp, where de messages are actually sent/received:

void zmq::udp_engine_t::out_event()
{
    msg_t group_msg;
    int rc = session->pull_msg (&group_msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));

    if (rc == 0) {
        msg_t body_msg;
        rc = session->pull_msg (&body_msg);

        size_t group_size = group_msg.size ();
        size_t body_size = body_msg.size ();
        size_t size = group_size + body_size + 1;

        out_buffer[0] = (unsigned char) group_size;
        memcpy (out_buffer + 1, group_msg.data (), group_size);
        memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);

        [...]

        rc = sendto (fd, out_buffer, size, 0,
            address->resolved.udp_addr->dest_addr (),
            address->resolved.udp_addr->dest_addrlen ());
        errno_assert (rc != -1);
    }
    [...]
}

void zmq::udp_engine_t::in_event()
{
    int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
    [...]

    int group_size = in_buffer[0];

    //  This doesn't fit, just ingore
    if (nbytes - 1 < group_size)
        return;

    int body_size = nbytes - 1 - group_size;

    msg_t msg;
    int rc = msg.init_size (group_size);
    errno_assert (rc == 0);
    msg.set_flags (msg_t::more);
    memcpy (msg.data (), in_buffer + 1, group_size);

    rc = session->push_msg (&msg);
    [...]

    [...]
    memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
    rc = session->push_msg (&msg);
    [...]
}

I can see that the single message with group information set with zmq_msg_data() and zmq_msg_set_group() for radio/dish sockets (the only ones to currently use the UDP engine) became, at this point, 2 separate messages, pulled from/pushed to the session.

Further investigation showed that this separation only happens on the push_msg() and pull_msg() overrides of radio_session_t and dish_session_t. Why the message is not simply forwarded, like in session_base_t, and we extract/fill group info from the u.base.group field of msg_t ? Obviously I don't understand all the code, but, for now, I don't see a reason for it.

Could I ask for some feedback about my proposed additions/changes, and insight into the need for a different session push/pull for radio and dish sockets ?

Sorry for the long post and possibly bad English. Thanks in advance. 		 	   		  
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20160514/9636bf87/attachment.htm>


More information about the zeromq-dev mailing list