[zeromq-dev] unix ipc for 0MQ2

Jon Dyte jon at totient.co.uk
Thu Jan 14 22:12:28 CET 2010


Hi

Below is a patch to provide ipc support for 0MQ2 on unix via Unix Domain 
sockets.

There is some further refactoring of the code in the tcp* classes that could 
occur. Some man pages need updating and some performance tests need adding
but I'd rather get the code out now for review.

I have tested this builds and works on Linux only at present.

The ipc support provided is not available on windows.


example server :-

#include <zmq.hpp>
#include <pthread.h>
#include <stdio.h>
#include <string>
int main()
{
  zmq::context_t ctx (1, 1);
  zmq::socket_t rep (ctx, ZMQ_REP);
  rep.bind ("ipc:///tmp/test_zmq_ipc");

  while (true)
  {
    zmq::message_t request;
    rep.recv(&request);
    int d ;
    memcpy(&d, request.data(),  sizeof(int));
    printf ("%d \n", d);
    zmq::message_t response;
    rep.send(response);
  }

example client:-

// zmq
#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
int main(int argc , char*argv[])
{

  zmq::context_t ctx (1, 1);

  zmq::socket_t s (ctx, ZMQ_REQ);

  s.connect ("ipc:///tmp/test_zmq_ipc");

  for (int i = 0 ; i < 1000 ; ++i)
  {
    zmq::message_t request(sizeof(int));
    memcpy(request.data(), &i, sizeof(int));
    s.send(request);

    zmq::message_t response;
    s.recv(&response);
  }
  return 0;

}


I'm distributing these under the MIT License.

feedback welcome,

enjoy.
Jon
diff --git a/src/ip.cpp b/src/ip.cpp
index 50af2ce..0c4adb7 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -309,3 +309,16 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const 
char *hostname_)
 
     return 0;
 }
+
+int zmq::resolve_local_path( sockaddr_un * addr_, const char* path_)
+{
+    
+    if (::strlen(path_) >= sizeof(addr_->sun_path))
+    {
+        errno = ENAMETOOLONG;
+        return -1;
+    }
+    ::strcpy(addr_->sun_path, path_);
+    addr_->sun_family = AF_LOCAL;
+    return 0;
+}
diff --git a/src/ip.hpp b/src/ip.hpp
index 8a0a34f..65364b5 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -30,6 +30,7 @@
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <netdb.h>
+#include <sys/un.h>
 #endif
 
 namespace zmq
@@ -42,6 +43,10 @@ namespace zmq
     //  This function resolves a string in <hostname>:<port-number> format.
     //  Hostname can be either the name of the host or its IP address.
     int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_);
+
+
+    // This function sets up the sockaddr_un structure with the pathname_
+    int resolve_local_path( sockaddr_un * addr_, const char* pathname_);
 }
 
 #endif 
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ad68fdb..7ec546a 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -87,11 +87,21 @@ int zmq::socket_base_t::bind (const char *addr_)
     if (addr_type == "inproc")
         return register_endpoint (addr_args.c_str (), this);
 
-    if (addr_type == "tcp") {
+    if (addr_type == "tcp" || addr_type == "ipc" ) {
+
+#if defined ZMQ_HAVE_WINDOWS
+        if ( addr_type == "ipc" ) {
+            errno = EPROTONOSUPPORT;
+            return -1;
+        }
+#endif
+
+
+
         zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
             choose_io_thread (options.affinity), this, options);
         zmq_assert (listener);
-        int rc = listener->set_address (addr_args.c_str ());
+        int rc = listener->set_address (addr_type.c_str(), addr_args.c_str 
());
         if (rc != 0)
             return -1;
 
@@ -202,7 +212,14 @@ int zmq::socket_base_t::connect (const char *addr_)
     send_plug (session);
     send_own (this, session);
 
-    if (addr_type == "tcp") {
+    if (addr_type == "tcp" || addr_type == "ipc" ) {
+
+#if defined ZMQ_HAVE_WINDOWS
+        if ( addr_type == "ipc" ) {
+            errno = EPROTONOSUPPORT;
+            return -1;
+        }
+#endif
 
         //  Create the connecter object. Supply it with the session name
         //  so that it can bind the new connection to the session once
@@ -211,7 +228,7 @@ int zmq::socket_base_t::connect (const char *addr_)
             choose_io_thread (options.affinity), this, options,
             session->get_ordinal (), false);
         zmq_assert (connecter);
-        int rc = connecter->set_address (addr_args.c_str ());
+        int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str 
());
         if (rc != 0) {
             delete connecter;
             return -1;
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 5dcebba..76c7153 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -18,7 +18,7 @@
 */
 
 #include <string>
-
+#include <string.h>
 #include "tcp_connecter.hpp"
 #include "platform.hpp"
 #include "ip.hpp"
@@ -38,10 +38,15 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
         close ();
 }
 
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char * protocol_, const char 
*addr_)
 {
+    if ( ::strcmp(protocol_, "tcp") != 0 ) {
+        errno = EPROTONOSUPPORT;
+        return -1;
+    }
+
     //  Convert the hostname into sockaddr_in structure.
-    return resolve_ip_hostname (&addr, addr_);
+    return resolve_ip_hostname ((sockaddr_in*)&addr, addr_);
 }
 
 int zmq::tcp_connecter_t::open ()
@@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open ()
     wsa_assert (rc != SOCKET_ERROR);
 
     //  Connect to the remote peer.
-    rc = ::connect (s, (sockaddr*) &addr, sizeof addr);
+    rc = ::connect (s, (sockaddr*) &addr, sizeof sockaddr_in);
 
     //  Connect was successfull immediately.
     if (rc == 0)
@@ -143,57 +148,100 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
         close ();
 }
 
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char* protocol_, const char 
*addr_)
 {
-    //  Convert the hostname into sockaddr_in structure.
-    return resolve_ip_hostname (&addr, addr_);
+    if ( ::strcmp( protocol_, "tcp") == 0) {
+        //  Convert the hostname into sockaddr_in structure.
+        return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_);
+    }
+    else if ( ::strcmp(protocol_,"ipc") == 0) {
+        // convert the path in addr_ into the sockaddr_un structure.
+        return resolve_local_path (( struct sockaddr_un*)&addr, addr_);
+    }
+
+    errno = EPROTONOSUPPORT;
+    return -1;
 }
 
 int zmq::tcp_connecter_t::open ()
 {
     zmq_assert (s == retired_fd);
 
-    //  Create the socket.
-    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
-    if (s == -1)
-        return -1;
+    struct sockaddr * sa = (struct sockaddr*) & addr;
 
-    // Set to non-blocking mode.
-    int flags = fcntl (s, F_GETFL, 0);
-    if (flags == -1) 
-        flags = 0;
-    int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
-    errno_assert (rc != -1);
+    if ( AF_INET == sa->sa_family) {
+        //  Create the socket.
+        s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+        if (s == -1)
+            return -1;
 
-    //  Disable Nagle's algorithm.
-    int flag = 1;
-    rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof 
(int));
-    errno_assert (rc == 0);
+        // Set to non-blocking mode.
+        int flags = fcntl (s, F_GETFL, 0);
+        if (flags == -1) 
+            flags = 0;
+        int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
+        errno_assert (rc != -1);
+
+        //  Disable Nagle's algorithm.
+        int flag = 1;
+        rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof 
(int));
+        errno_assert (rc == 0);
 
 #ifdef ZMQ_HAVE_OPENVMS
-    //  Disable delayed acknowledgements.
-    flag = 1;
-    rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof 
(int));
-    errno_assert (rc != SOCKET_ERROR);
+        //  Disable delayed acknowledgements.
+        flag = 1;
+        rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof 
(int));
+        errno_assert (rc != SOCKET_ERROR);
 #endif
 
-    //  Connect to the remote peer.
-    rc = ::connect (s, (sockaddr*) &addr, sizeof (addr));
+        //  Connect to the remote peer.
+        rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
 
-    //  Connect was successfull immediately.
-    if (rc == 0)
-        return 0;
+        //  Connect was successfull immediately.
+        if (rc == 0)
+            return 0;
 
-    //  Asynchronous connect was launched.
-    if (rc == -1 && errno == EINPROGRESS) {
-        errno = EAGAIN;
+        //  Asynchronous connect was launched.
+        if (rc == -1 && errno == EINPROGRESS) {
+            errno = EAGAIN;
+            return -1;
+        }
+
+        //  Error occured.
+        int err = errno;
+        close ();
+        errno = err;
+        return -1;
+
+    }
+    else if ( AF_LOCAL == sa->sa_family)
+    {
+        s = socket (AF_LOCAL, SOCK_STREAM, 0);
+        if (s == -1)
+            return -1;
+
+        //  Set the non-blocking flag.
+        int flag = fcntl (s, F_GETFL, 0);
+        if (flag == -1) 
+            flag = 0;
+        int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+        errno_assert (rc != -1);
+        //  Connect to the remote peer.
+        rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+
+        //  Connect was successfull immediately.
+        if (rc == 0)
+            return 0;
+
+        //  Error occured.
+        int err = errno;
+        close ();
+        errno = err;
         return -1;
+
     }
 
-    //  Error occured.
-    int err = errno;
-    close ();
-    errno = err;
+
     return -1;
 }
 
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index d397296..cd67ee2 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -36,7 +36,7 @@ namespace zmq
         ~tcp_connecter_t ();
 
         //  Set IP address/port to connect to.
-        int set_address (const char *addr_);
+        int set_address (const char* protocol, const char *addr_);
 
         //  Open TCP connecting socket. Address is in
         //  <hostname>:<port-number> format. Returns -1 in case of error,
@@ -58,7 +58,7 @@ namespace zmq
     private:
 
         //  Address to connect to.
-        sockaddr_in addr;
+        sockaddr_storage addr;
 
         //  Underlying socket.
         fd_t s;
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9087405..bf99b8c 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t ()
         close ();
 }
 
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char * protocol_, const char 
*addr_)
 {
+
+    if ( ::strcmp(protocol_, "tcp") != 0 ) {
+        errno = EPROTONOSUPPORT;
+        return -1;
+    }
+
     //  Convert the interface into sockaddr_in structure.
-    int rc = resolve_ip_interface (&addr, addr_);
+    int rc = resolve_ip_interface ((sockaddr_in*)&addr, addr_);
     if (rc != 0)
         return rc;
 
@@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
     wsa_assert (rc != SOCKET_ERROR);
 
     //  Bind the socket to the network interface and port.
-    rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
+    rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
     if (rc == SOCKET_ERROR) {
         wsa_error_to_errno ();
         return -1;
@@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
 #include <netinet/in.h>
 #include <netdb.h>
 #include <fcntl.h>
+#include <sys/un.h>
 
 zmq::tcp_listener_t::tcp_listener_t () :
     s (retired_fd)
@@ -144,44 +151,86 @@ zmq::tcp_listener_t::~tcp_listener_t ()
         close ();
 }
 
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char* protocol_, const char 
*addr_)
 {
-    //  Convert the interface into sockaddr_in structure.
-    int rc = resolve_ip_interface (&addr, addr_);
-    if (rc != 0)
-        return rc;
-
-    //  Create a listening socket.
-    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
-    if (s == -1)
-        return -1;
-
-    //  Allow reusing of the address.
-    int flag = 1;
-    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
-    errno_assert (rc == 0);
 
-    //  Set the non-blocking flag.
-    flag = fcntl (s, F_GETFL, 0);
-    if (flag == -1) 
-        flag = 0;
-    rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
-    errno_assert (rc != -1);
-
-    //  Bind the socket to the network interface and port.
-    rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
-    if (rc != 0) {
-        close ();
-        return -1;
+    if ( ::strcmp(protocol_, "tcp") == 0 ) {
+
+        //  Convert the interface into sockaddr_in structure.
+        int rc = resolve_ip_interface ((struct sockaddr_in*)&addr, addr_);
+        if (rc != 0)
+            return rc;
+
+        //  Create a listening socket.
+        s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+        if (s == -1)
+            return -1;
+
+        //  Allow reusing of the address.
+        int flag = 1;
+        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
+        errno_assert (rc == 0);
+
+        //  Set the non-blocking flag.
+        flag = fcntl (s, F_GETFL, 0);
+        if (flag == -1) 
+            flag = 0;
+        rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+        errno_assert (rc != -1);
+
+        //  Bind the socket to the network interface and port.
+        rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
+        if (rc != 0) {
+            close ();
+            return -1;
+        }
+
+        //  Listen for incomming connections.
+        rc = listen (s, tcp_connection_backlog);
+        if (rc != 0) {
+            close ();
+            return -1;
+        }
     }
-              
-    //  Listen for incomming connections.
-    rc = listen (s, tcp_connection_backlog);
-    if (rc != 0) {
-        close ();
+    else if ( ::strcmp(protocol_, "ipc") == 0) {
+
+        (void)::unlink(addr_);
+        
+        int rc = resolve_local_path((struct sockaddr_un*) &addr, addr_);
+        if (rc != 0)
+            return rc;
+
+        s = socket (AF_LOCAL, SOCK_STREAM, 0);
+        if (s == -1)
+            return -1;
+
+        //  Set the non-blocking flag.
+        int flag = fcntl (s, F_GETFL, 0);
+        if (flag == -1) 
+            flag = 0;
+        rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+        errno_assert (rc != -1);
+
+        //  Bind the socket to the file path.
+        rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+        if (rc != 0) {
+            close ();
+            return -1;
+        }
+
+        //  Listen for incomming connections.
+        rc = listen (s, tcp_connection_backlog);
+        if (rc != 0) {
+            close ();
+            return -1;
+        }
+    }
+    else {
+        errno = EPROTONOSUPPORT;
         return -1;
     }
 
+
     return 0;
 }
 
@@ -192,6 +241,16 @@ int zmq::tcp_listener_t::close ()
     if (rc != 0)
         return -1;
     s = retired_fd;
+
+    struct sockaddr * sa = (struct sockaddr*) & addr;
+    if ( AF_LOCAL == sa->sa_family )
+    {
+        struct sockaddr_un* sun = (struct sockaddr_un*) & addr;
+        rc = ::unlink(sun->sun_path);
+        if (rc != 0)
+            return -1;
+    }
+
     return 0;
 }
 
@@ -239,20 +298,24 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
     int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
     errno_assert (rc != -1);
 
-    //  Disable Nagle's algorithm.
-    int flag = 1;
-    rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
-        sizeof (int));
-    errno_assert (rc == 0);
+    struct sockaddr * sa = (struct sockaddr*) & addr;
+    if ( AF_INET == sa->sa_family )
+    {
+        //  Disable Nagle's algorithm.
+        int flag = 1;
+        rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
+                         sizeof (int));
+        errno_assert (rc == 0);
+
 
 #ifdef ZMQ_HAVE_OPENVMS
-    //  Disable delayed acknowledgements.
-    flag = 1;
-    rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
-        sizeof (int));
-    errno_assert (rc != SOCKET_ERROR);
+        //  Disable delayed acknowledgements.
+        flag = 1;
+        rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
+                         sizeof (int));
+        errno_assert (rc != SOCKET_ERROR);
 #endif
-
+    }
     return sock;
 }
 
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 1dfe288..718725b 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -23,6 +23,8 @@
 #include "fd.hpp"
 #include "ip.hpp"
 
+#include <string>
+
 namespace zmq
 {
 
@@ -35,10 +37,13 @@ namespace zmq
         tcp_listener_t ();
         ~tcp_listener_t ();
 
-        //  Start listening on the interface. Address is in
+        //  Start listening on the interface. 
+        //  If protocol is "tcp" then address is in
         //  <interface-name>:<port-number> format. Interface name may be '*'
         //  to bind to all the interfaces.
-        int set_address (const char *addr_);
+        //  If protocol is "ipc" then address is the
+        //  the file pathname.
+        int set_address (const char* protocol_, const char *addr_);
 
         //  Close the listening socket.
         int close ();
@@ -55,8 +60,8 @@ namespace zmq
     private:
 
         //  IP address/port to listen on.
-        sockaddr_in addr;
-
+        sockaddr_storage addr;
+            
         //  Underlying socket.
         fd_t s;
 
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index d4c2727..e2bd461 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -41,12 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
 {
 }
 
-int zmq::zmq_connecter_t::set_address (const char *address_)
+int zmq::zmq_connecter_t::set_address (const char* protocol_, const char 
*address_)
 {
-     int rc = tcp_connecter.set_address (address_);
+     int rc = tcp_connecter.set_address (protocol_, address_);
      if (rc != 0)
          return rc;
      address = address_;
+     protocol = protocol_;
      return 0;
 }
 
@@ -91,7 +92,7 @@ void zmq::zmq_connecter_t::out_event ()
     //  Create an init object. 
     zmq_init_t *init = new (std::nothrow) zmq_init_t (
         choose_io_thread (options.affinity), owner,
-        fd, options, true, address.c_str (), session_ordinal);
+        fd, options, true, protocol.c_str(), address.c_str (), 
session_ordinal);
     zmq_assert (init);
     send_plug (init);
     send_own (owner, init);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index c3a42a9..88cd478 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -39,8 +39,8 @@ namespace zmq
             const options_t &options_, uint64_t session_ordinal_, bool 
wait_);
         ~zmq_connecter_t ();
 
-        //  Set IP address to connect to.
-        int set_address (const char *address_);
+        //  Set address to connect to.
+        int set_address (const char *protocol_, const char *address_);
 
     private:
 
@@ -77,6 +77,8 @@ namespace zmq
 
         //  Address to connect to.
         std::string address;
+        // Transport
+        std::string protocol;
 
         zmq_connecter_t (const zmq_connecter_t&);
         void operator = (const zmq_connecter_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index a79c0bd..05b92fb 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -27,7 +27,8 @@
 #include "err.hpp"
 
 zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
-      const options_t &options_, bool reconnect_, const char *address_) :
+      const options_t &options_, bool reconnect_, 
+      const char *protocol_, const char *address_) :
     io_object_t (parent_),
     inpos (NULL),
     insize (0),
@@ -39,9 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t 
fd_,
     options (options_),
     reconnect (reconnect_)
 {
-    if (reconnect)
+    if (reconnect) {
         address = address_;
-
+        protocol = protocol_;
+    }
     //  Initialise the underlying socket.
     int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
     zmq_assert (rc == 0);
@@ -166,7 +168,7 @@ void zmq::zmq_engine_t::error ()
             inout->get_io_thread (), inout->get_owner (),
             options, inout->get_ordinal (), true);
         zmq_assert (reconnecter);
-        reconnecter->set_address (address.c_str ());
+        reconnecter->set_address (protocol.c_str(), address.c_str ());
     }
 
     inout->detach (reconnecter);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index d26e304..84a0ab3 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -37,7 +37,8 @@ namespace zmq
     public:
 
         zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
-            const options_t &options_, bool reconnect_, const char 
*address_);
+            const options_t &options_, bool reconnect_, 
+            const char *protocol_, const char* address_);
         ~zmq_engine_t ();
 
         //  i_engine interface implementation.
@@ -71,6 +72,7 @@ namespace zmq
 
         bool reconnect;
         std::string address;
+        std::string protocol;
 
         zmq_engine_t (const zmq_engine_t&);
         void operator = (const zmq_engine_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index c602e1d..70bbd21 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -25,7 +25,7 @@
 
 zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
       fd_t fd_, const options_t &options_, bool reconnect_,
-      const char *address_, uint64_t session_ordinal_) :
+      const char *protocol_, const char* address_, uint64_t 
session_ordinal_) :
     owned_t (parent_, owner_),
     sent (false),
     received (false),
@@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, 
socket_base_t *owner_,
 {
     //  Create the engine object for this connection.
     engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
-        reconnect_, address_);
+                                              reconnect_, protocol_, 
address_);
     zmq_assert (engine);
 }
 
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 414adfe..5a131f0 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -41,7 +41,7 @@ namespace zmq
 
         zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
             fd_t fd_, const options_t &options_, bool reconnect_,
-            const char *address_, uint64_t session_ordinal_);
+            const char *protocol_, const char* address_, uint64_t 
session_ordinal_);
         ~zmq_init_t ();
 
     private:
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 9ccd82b..4fcc4a9 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t ()
 {
 }
 
-int zmq::zmq_listener_t::set_address (const char *addr_)
+int zmq::zmq_listener_t::set_address (const char* protocol_, const char 
*addr_)
 {
-     return tcp_listener.set_address (addr_);
+    return tcp_listener.set_address (protocol_, addr_);
 }
 
 void zmq::zmq_listener_t::process_plug ()
@@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event ()
     //  Create an init object. 
     io_thread_t *io_thread = choose_io_thread (options.affinity);
     zmq_init_t *init = new (std::nothrow) zmq_init_t (
-        io_thread, owner, fd, options, false, NULL, 0);
+        io_thread, owner, fd, options, false, NULL, NULL, 0);
     zmq_assert (init);
     send_plug (init);
     send_own (owner, init);
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index 6f8cdc9..e179903 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -31,13 +31,13 @@ namespace zmq
 
     class zmq_listener_t : public owned_t, public io_object_t
     {
-    public:
+      public:
 
-        zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
-            const options_t &options_);
+            zmq_listener_t (class io_thread_t *parent_, socket_base_t 
*owner_,
+	const options_t &options_);
 
-        //  Set IP address to listen on.
-        int set_address (const char *addr_);
+    //  Set address to listen on.
+    int set_address (const char* protocol_, const char *addr_);
 
     private:
 



More information about the zeromq-dev mailing list