[zeromq-dev] unix ipc for 0MQ2
Jon Dyte
jon at totient.co.uk
Thu Jan 14 22:12:28 CET 2010
Hi
Below is a patch to provide ipc support for 0MQ2 on unix via Unix Domain
sockets.
There is some further refactoring of the code in the tcp* classes that could
occur. Some man pages need updating and some performance tests need adding
but I'd rather get the code out now for review.
I have tested this builds and works on Linux only at present.
The ipc support provided is not available on windows.
example server :-
#include <zmq.hpp>
#include <pthread.h>
#include <stdio.h>
#include <string>
int main()
{
zmq::context_t ctx (1, 1);
zmq::socket_t rep (ctx, ZMQ_REP);
rep.bind ("ipc:///tmp/test_zmq_ipc");
while (true)
{
zmq::message_t request;
rep.recv(&request);
int d ;
memcpy(&d, request.data(), sizeof(int));
printf ("%d \n", d);
zmq::message_t response;
rep.send(response);
}
example client:-
// zmq
#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
int main(int argc , char*argv[])
{
zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_REQ);
s.connect ("ipc:///tmp/test_zmq_ipc");
for (int i = 0 ; i < 1000 ; ++i)
{
zmq::message_t request(sizeof(int));
memcpy(request.data(), &i, sizeof(int));
s.send(request);
zmq::message_t response;
s.recv(&response);
}
return 0;
}
I'm distributing these under the MIT License.
feedback welcome,
enjoy.
Jon
diff --git a/src/ip.cpp b/src/ip.cpp
index 50af2ce..0c4adb7 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -309,3 +309,16 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const
char *hostname_)
return 0;
}
+
+int zmq::resolve_local_path( sockaddr_un * addr_, const char* path_)
+{
+
+ if (::strlen(path_) >= sizeof(addr_->sun_path))
+ {
+ errno = ENAMETOOLONG;
+ return -1;
+ }
+ ::strcpy(addr_->sun_path, path_);
+ addr_->sun_family = AF_LOCAL;
+ return 0;
+}
diff --git a/src/ip.hpp b/src/ip.hpp
index 8a0a34f..65364b5 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -30,6 +30,7 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <sys/un.h>
#endif
namespace zmq
@@ -42,6 +43,10 @@ namespace zmq
// This function resolves a string in <hostname>:<port-number> format.
// Hostname can be either the name of the host or its IP address.
int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_);
+
+
+ // This function sets up the sockaddr_un structure with the pathname_
+ int resolve_local_path( sockaddr_un * addr_, const char* pathname_);
}
#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ad68fdb..7ec546a 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -87,11 +87,21 @@ int zmq::socket_base_t::bind (const char *addr_)
if (addr_type == "inproc")
return register_endpoint (addr_args.c_str (), this);
- if (addr_type == "tcp") {
+ if (addr_type == "tcp" || addr_type == "ipc" ) {
+
+#if defined ZMQ_HAVE_WINDOWS
+ if ( addr_type == "ipc" ) {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+
+
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
zmq_assert (listener);
- int rc = listener->set_address (addr_args.c_str ());
+ int rc = listener->set_address (addr_type.c_str(), addr_args.c_str
());
if (rc != 0)
return -1;
@@ -202,7 +212,14 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session);
send_own (this, session);
- if (addr_type == "tcp") {
+ if (addr_type == "tcp" || addr_type == "ipc" ) {
+
+#if defined ZMQ_HAVE_WINDOWS
+ if ( addr_type == "ipc" ) {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
// Create the connecter object. Supply it with the session name
// so that it can bind the new connection to the session once
@@ -211,7 +228,7 @@ int zmq::socket_base_t::connect (const char *addr_)
choose_io_thread (options.affinity), this, options,
session->get_ordinal (), false);
zmq_assert (connecter);
- int rc = connecter->set_address (addr_args.c_str ());
+ int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str
());
if (rc != 0) {
delete connecter;
return -1;
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 5dcebba..76c7153 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -18,7 +18,7 @@
*/
#include <string>
-
+#include <string.h>
#include "tcp_connecter.hpp"
#include "platform.hpp"
#include "ip.hpp"
@@ -38,10 +38,15 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char * protocol_, const char
*addr_)
{
+ if ( ::strcmp(protocol_, "tcp") != 0 ) {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
// Convert the hostname into sockaddr_in structure.
- return resolve_ip_hostname (&addr, addr_);
+ return resolve_ip_hostname ((sockaddr_in*)&addr, addr_);
}
int zmq::tcp_connecter_t::open ()
@@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open ()
wsa_assert (rc != SOCKET_ERROR);
// Connect to the remote peer.
- rc = ::connect (s, (sockaddr*) &addr, sizeof addr);
+ rc = ::connect (s, (sockaddr*) &addr, sizeof sockaddr_in);
// Connect was successfull immediately.
if (rc == 0)
@@ -143,57 +148,100 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char* protocol_, const char
*addr_)
{
- // Convert the hostname into sockaddr_in structure.
- return resolve_ip_hostname (&addr, addr_);
+ if ( ::strcmp( protocol_, "tcp") == 0) {
+ // Convert the hostname into sockaddr_in structure.
+ return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_);
+ }
+ else if ( ::strcmp(protocol_,"ipc") == 0) {
+ // convert the path in addr_ into the sockaddr_un structure.
+ return resolve_local_path (( struct sockaddr_un*)&addr, addr_);
+ }
+
+ errno = EPROTONOSUPPORT;
+ return -1;
}
int zmq::tcp_connecter_t::open ()
{
zmq_assert (s == retired_fd);
- // Create the socket.
- s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (s == -1)
- return -1;
+ struct sockaddr * sa = (struct sockaddr*) & addr;
- // Set to non-blocking mode.
- int flags = fcntl (s, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
+ if ( AF_INET == sa->sa_family) {
+ // Create the socket.
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == -1)
+ return -1;
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof
(int));
- errno_assert (rc == 0);
+ // Set to non-blocking mode.
+ int flags = fcntl (s, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Disable Nagle's algorithm.
+ int flag = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof
(int));
+ errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof
(int));
- errno_assert (rc != SOCKET_ERROR);
+ // Disable delayed acknowledgements.
+ flag = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof
(int));
+ errno_assert (rc != SOCKET_ERROR);
#endif
- // Connect to the remote peer.
- rc = ::connect (s, (sockaddr*) &addr, sizeof (addr));
+ // Connect to the remote peer.
+ rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
- // Connect was successfull immediately.
- if (rc == 0)
- return 0;
+ // Connect was successfull immediately.
+ if (rc == 0)
+ return 0;
- // Asynchronous connect was launched.
- if (rc == -1 && errno == EINPROGRESS) {
- errno = EAGAIN;
+ // Asynchronous connect was launched.
+ if (rc == -1 && errno == EINPROGRESS) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Error occured.
+ int err = errno;
+ close ();
+ errno = err;
+ return -1;
+
+ }
+ else if ( AF_LOCAL == sa->sa_family)
+ {
+ s = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (s == -1)
+ return -1;
+
+ // Set the non-blocking flag.
+ int flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+ // Connect to the remote peer.
+ rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+
+ // Connect was successfull immediately.
+ if (rc == 0)
+ return 0;
+
+ // Error occured.
+ int err = errno;
+ close ();
+ errno = err;
return -1;
+
}
- // Error occured.
- int err = errno;
- close ();
- errno = err;
+
return -1;
}
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index d397296..cd67ee2 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -36,7 +36,7 @@ namespace zmq
~tcp_connecter_t ();
// Set IP address/port to connect to.
- int set_address (const char *addr_);
+ int set_address (const char* protocol, const char *addr_);
// Open TCP connecting socket. Address is in
// <hostname>:<port-number> format. Returns -1 in case of error,
@@ -58,7 +58,7 @@ namespace zmq
private:
// Address to connect to.
- sockaddr_in addr;
+ sockaddr_storage addr;
// Underlying socket.
fd_t s;
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9087405..bf99b8c 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char * protocol_, const char
*addr_)
{
+
+ if ( ::strcmp(protocol_, "tcp") != 0 ) {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
// Convert the interface into sockaddr_in structure.
- int rc = resolve_ip_interface (&addr, addr_);
+ int rc = resolve_ip_interface ((sockaddr_in*)&addr, addr_);
if (rc != 0)
return rc;
@@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
wsa_assert (rc != SOCKET_ERROR);
// Bind the socket to the network interface and port.
- rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
@@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
+#include <sys/un.h>
zmq::tcp_listener_t::tcp_listener_t () :
s (retired_fd)
@@ -144,44 +151,86 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char* protocol_, const char
*addr_)
{
- // Convert the interface into sockaddr_in structure.
- int rc = resolve_ip_interface (&addr, addr_);
- if (rc != 0)
- return rc;
-
- // Create a listening socket.
- s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (s == -1)
- return -1;
-
- // Allow reusing of the address.
- int flag = 1;
- rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
- errno_assert (rc == 0);
- // Set the non-blocking flag.
- flag = fcntl (s, F_GETFL, 0);
- if (flag == -1)
- flag = 0;
- rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
- errno_assert (rc != -1);
-
- // Bind the socket to the network interface and port.
- rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
- if (rc != 0) {
- close ();
- return -1;
+ if ( ::strcmp(protocol_, "tcp") == 0 ) {
+
+ // Convert the interface into sockaddr_in structure.
+ int rc = resolve_ip_interface ((struct sockaddr_in*)&addr, addr_);
+ if (rc != 0)
+ return rc;
+
+ // Create a listening socket.
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == -1)
+ return -1;
+
+ // Allow reusing of the address.
+ int flag = 1;
+ rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
+ errno_assert (rc == 0);
+
+ // Set the non-blocking flag.
+ flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Bind the socket to the network interface and port.
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ // Listen for incomming connections.
+ rc = listen (s, tcp_connection_backlog);
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
}
-
- // Listen for incomming connections.
- rc = listen (s, tcp_connection_backlog);
- if (rc != 0) {
- close ();
+ else if ( ::strcmp(protocol_, "ipc") == 0) {
+
+ (void)::unlink(addr_);
+
+ int rc = resolve_local_path((struct sockaddr_un*) &addr, addr_);
+ if (rc != 0)
+ return rc;
+
+ s = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (s == -1)
+ return -1;
+
+ // Set the non-blocking flag.
+ int flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Bind the socket to the file path.
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ // Listen for incomming connections.
+ rc = listen (s, tcp_connection_backlog);
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+ }
+ else {
+ errno = EPROTONOSUPPORT;
return -1;
}
+
return 0;
}
@@ -192,6 +241,16 @@ int zmq::tcp_listener_t::close ()
if (rc != 0)
return -1;
s = retired_fd;
+
+ struct sockaddr * sa = (struct sockaddr*) & addr;
+ if ( AF_LOCAL == sa->sa_family )
+ {
+ struct sockaddr_un* sun = (struct sockaddr_un*) & addr;
+ rc = ::unlink(sun->sun_path);
+ if (rc != 0)
+ return -1;
+ }
+
return 0;
}
@@ -239,20 +298,24 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- errno_assert (rc == 0);
+ struct sockaddr * sa = (struct sockaddr*) & addr;
+ if ( AF_INET == sa->sa_family )
+ {
+ // Disable Nagle's algorithm.
+ int flag = 1;
+ rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
+ sizeof (int));
+ errno_assert (rc == 0);
+
#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
- sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
+ // Disable delayed acknowledgements.
+ flag = 1;
+ rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
+ sizeof (int));
+ errno_assert (rc != SOCKET_ERROR);
#endif
-
+ }
return sock;
}
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 1dfe288..718725b 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -23,6 +23,8 @@
#include "fd.hpp"
#include "ip.hpp"
+#include <string>
+
namespace zmq
{
@@ -35,10 +37,13 @@ namespace zmq
tcp_listener_t ();
~tcp_listener_t ();
- // Start listening on the interface. Address is in
+ // Start listening on the interface.
+ // If protocol is "tcp" then address is in
// <interface-name>:<port-number> format. Interface name may be '*'
// to bind to all the interfaces.
- int set_address (const char *addr_);
+ // If protocol is "ipc" then address is the
+ // the file pathname.
+ int set_address (const char* protocol_, const char *addr_);
// Close the listening socket.
int close ();
@@ -55,8 +60,8 @@ namespace zmq
private:
// IP address/port to listen on.
- sockaddr_in addr;
-
+ sockaddr_storage addr;
+
// Underlying socket.
fd_t s;
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index d4c2727..e2bd461 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -41,12 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
{
}
-int zmq::zmq_connecter_t::set_address (const char *address_)
+int zmq::zmq_connecter_t::set_address (const char* protocol_, const char
*address_)
{
- int rc = tcp_connecter.set_address (address_);
+ int rc = tcp_connecter.set_address (protocol_, address_);
if (rc != 0)
return rc;
address = address_;
+ protocol = protocol_;
return 0;
}
@@ -91,7 +92,7 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (
choose_io_thread (options.affinity), owner,
- fd, options, true, address.c_str (), session_ordinal);
+ fd, options, true, protocol.c_str(), address.c_str (),
session_ordinal);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index c3a42a9..88cd478 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -39,8 +39,8 @@ namespace zmq
const options_t &options_, uint64_t session_ordinal_, bool
wait_);
~zmq_connecter_t ();
- // Set IP address to connect to.
- int set_address (const char *address_);
+ // Set address to connect to.
+ int set_address (const char *protocol_, const char *address_);
private:
@@ -77,6 +77,8 @@ namespace zmq
// Address to connect to.
std::string address;
+ // Transport
+ std::string protocol;
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index a79c0bd..05b92fb 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -27,7 +27,8 @@
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
- const options_t &options_, bool reconnect_, const char *address_) :
+ const options_t &options_, bool reconnect_,
+ const char *protocol_, const char *address_) :
io_object_t (parent_),
inpos (NULL),
insize (0),
@@ -39,9 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t
fd_,
options (options_),
reconnect (reconnect_)
{
- if (reconnect)
+ if (reconnect) {
address = address_;
-
+ protocol = protocol_;
+ }
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
@@ -166,7 +168,7 @@ void zmq::zmq_engine_t::error ()
inout->get_io_thread (), inout->get_owner (),
options, inout->get_ordinal (), true);
zmq_assert (reconnecter);
- reconnecter->set_address (address.c_str ());
+ reconnecter->set_address (protocol.c_str(), address.c_str ());
}
inout->detach (reconnecter);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index d26e304..84a0ab3 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -37,7 +37,8 @@ namespace zmq
public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
- const options_t &options_, bool reconnect_, const char
*address_);
+ const options_t &options_, bool reconnect_,
+ const char *protocol_, const char* address_);
~zmq_engine_t ();
// i_engine interface implementation.
@@ -71,6 +72,7 @@ namespace zmq
bool reconnect;
std::string address;
+ std::string protocol;
zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index c602e1d..70bbd21 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -25,7 +25,7 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
- const char *address_, uint64_t session_ordinal_) :
+ const char *protocol_, const char* address_, uint64_t
session_ordinal_) :
owned_t (parent_, owner_),
sent (false),
received (false),
@@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_,
socket_base_t *owner_,
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
- reconnect_, address_);
+ reconnect_, protocol_,
address_);
zmq_assert (engine);
}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 414adfe..5a131f0 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -41,7 +41,7 @@ namespace zmq
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
- const char *address_, uint64_t session_ordinal_);
+ const char *protocol_, const char* address_, uint64_t
session_ordinal_);
~zmq_init_t ();
private:
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 9ccd82b..4fcc4a9 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t ()
{
}
-int zmq::zmq_listener_t::set_address (const char *addr_)
+int zmq::zmq_listener_t::set_address (const char* protocol_, const char
*addr_)
{
- return tcp_listener.set_address (addr_);
+ return tcp_listener.set_address (protocol_, addr_);
}
void zmq::zmq_listener_t::process_plug ()
@@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new (std::nothrow) zmq_init_t (
- io_thread, owner, fd, options, false, NULL, 0);
+ io_thread, owner, fd, options, false, NULL, NULL, 0);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index 6f8cdc9..e179903 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -31,13 +31,13 @@ namespace zmq
class zmq_listener_t : public owned_t, public io_object_t
{
- public:
+ public:
- zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
- const options_t &options_);
+ zmq_listener_t (class io_thread_t *parent_, socket_base_t
*owner_,
+ const options_t &options_);
- // Set IP address to listen on.
- int set_address (const char *addr_);
+ // Set address to listen on.
+ int set_address (const char* protocol_, const char *addr_);
private:
More information about the zeromq-dev
mailing list