[zeromq-dev] Session handling behind REQ/REP, etc.

Oliver Smith oliver at kfs.org
Thu Dec 16 23:07:38 CET 2010


I regularly requests for "how do I tell if ... disconnected" etc.

I'm currently struggling with a way to associate a message over a 
PULL/PUSH and/or REQ/REP multiplexed zocket with the previous hop so 
that I can locally associate incoming messages with cached data/etc, 
without needing to send that information as part of a message payload.

I wonder if these two seemingly unrelated issues could simply be handled 
by providing ZMQ with two additional setsockopts:

     struct userdata_t { } ;

     typedef userdata_t* (attach_fn)(void) ;
     socket_t { ...  setsockopt(zmq.ATTACHUSERDATA, attach_fn) ;  ... }

     typedef void (detach_fn)(userdata_t*) ;
     socket_t { ... setsockopt(zmq.DETACHUSERDATA, detach_fn) ; ... }

When ZMQ internally handles a new incoming connection and has finished 
with the tear-up process and is ready to begin receiving messages on 
said connection, it will call the registered attach_fn for that 
socket_t. Where possible, the user can either derive their user data 
class from userdata_t or else simply cast their data type to userdata_t.

When a connection terminates, the registered detach_fn is called with 
the user data. I can't decide whether it would be a good idea to allow 
this to happen regardless of the value of the user data: I can see use 
cases where you might just want to know when connections are lost 
without assigning userdata to them, in which case you might want your 
detach_fn called with NULL.

Example code:

struct Session : public zmq::userdata_t
{
     enum State { _Invalid, _Connected, _Authenticated } ;

     Session() : connectedAt(time(NULL)), state(_Connected), crypt() {}
     // The destructor could probably do something more...
     // such as send a fake "hangup" zmq::message_t downstream
     // if the sender has hung up unexpectedly, or put the session
     // onto an expiry queue yadda yadda..
     ~Session() {}

     time_t connectedAt ;
     State state ;
     EncryptionStuff crypt ;    /// public/private keys for per-packet 
'cryption.

     bool authenticate(const zmq::message_t& msg) ;
     void process(const zmq::message_t& msg) ;
} ;

zmq::userdata_t* myAttachFn()
{
     Session* const session = new Session ;
     return (zmq::userdata_t*)session ;
}

void myDetachFn(zmq::userdata_t* session)
{
     if ( session == NULL ) return ;
     delete session ;
}

bool Session::authenticate(const zmq::message_t& msg)
{
     // First packet should contain "hello world"
     return ( strcmp((char*)msg.data(), "hello world") == 0 ) ;
}

void Session::process(const zmq::message_t& msg)
{
     switch ( session->state )
     {
     case    _Invalid:
         // Session has gone bad, ignore it.
         return  ;
     break ;
     case    _Connected:
         // This is the first message from this connection.
         if ( session->authenticate(msg) )
             state = Session::_Authenticated ;
         else
             state = Session::_Invalid ;
     break ;
     case _Authenticated:
         printf("%s\n", (char*)msg.data()) ;
     break ;
}

int main(const int argc, const char* const argv[])
{
     zmq::Context_t c(1) ;
     zmq::socket_t s(c, zmq::PULL) ;

     s.bind("tcp://0.0.0.0:54321") ;

     s.setsockopt(zmq.ATTACHUSERDATA, myAttachFn) ;
     s.setsockopt(zmq.DETACHUSERDATA, myDetachFn) ;

     while ( true )
     {
         zmq::message_t msg ;
         s.recv(msg) ;

         Session* const session = (Session*)s.getUserData() ;
         // or msg.getUserData?

         assert( session != NULL ) ;                                // 
how could it be?

         session->process(msg) ;
     }
}

- Oliver




More information about the zeromq-dev mailing list