[zeromq-dev] Strange bug with dealer socket

Antti Karanta Antti.Karanta at neromsoft.com
Tue May 19 13:32:50 CEST 2015


Hi,

I am writing asynchronous request - response client - server using 
dealer and router sockets (one of each), something that I believe is 
fairly common with zeromq. I read the examples and wrote my code (C# / 
Clojure CLR).
I have a dedicated thread handling the dealer socket and it communicates 
with other threads via concurrent queues so there should be only the 
single dedicated thread accessing the socket.
The dealer socket exhibits very odd behavior: if it is only written to, 
it sends the data nicely. However, if I also try to read from it (and 
due to the structure of the code, this easily happens before any data is 
written to the socket) strange things happen. The dealer socket goes 
berserk and instead of just sending a two part message with 4 and 212 
bytes it sends first a message with 39112 parts (all of which but the 
last being four bytes (zero at least for the most) the second to last 
being the real four byte part and then the real 212 bytes of payload 
followed by over 30000 messages with (in addition to the first frame 
added by the receiving router) contain one frame of 212 bytes (likely my 
data payload again, did not check).

I tried reproducing the problem in a simplified form but there 
everything seems to work smoothly. I read the real code through a dozen 
of times but it really seems to be doing the same thing with regards to 
zeromq.

I am fairly certain that the problem is at the dealer socket's end as I 
paired my simplified code with the real dealer and router code 
separately and the problem only occurred when using the real dealer code.

Now, the problem is surely in my own code but I wish to know if anyone 
else has had this sort of problem and, if so, what the problem / 
solution turned out to be? Or if anyone has ideas of what could be 
causing the socket to misbehave like this, where to look or what to try. 
Thanks.


  Environment:  zmq version 4.0.4, using clrzmq wrappers, windows 7 on x64.


          ::Antti::


   Ps. Here's the code I wrote trying to reproduce the problem. Like I 
said, this works so the problem is not apparent here. I just included it 
to show what I am trying to do.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

using ZeroMQ;
using System.Diagnostics;

namespace zmqtest {

     class ZmqDealerRouterExperiment {

         private static readonly int Port = 2112;

         private static readonly object _writelineLock = new object();
         private static ZmqContext _context;

         private static void WriteLine( string message ) {
             lock ( _writelineLock ) {
                 Console.WriteLine( message );
             }
         }

         private static void Write( string message ) {
             lock ( _writelineLock ) {
                 Console.Write( message );
             }
         }

         static void Main( string[] args ) {
             _context = ZmqContext.Create();
             new Thread( RunRouter ).Start();
             new Thread( RunDealer ).Start();
             WriteLine( "app started" );
         }

         private static void RunRouter() {
             var router = CreateRouter();
             WriteLine( "router created" );
             int reqNr = 1;
             while ( true ) {
                 var req = ReceiveRequest( router );
                 if ( req != null && req.Count > 0 ) {
                     Write( string.Format( "req {0,6}: ", reqNr++ ) );
                     if ( req.Count != 3 ) {
                         WriteLine( "wrong number of parts in request: " 
+ req.Count + " nbytes: " + string.Join( ",", req.Select( r => 
r.Length.ToString() ) ) );
                     } else if ( req[1].Length < 4 || req[2].Length < 4 ) {
                         WriteLine( "unexpected response lengths " + 
string.Join( ",", req.Select( r => r.Length.ToString() ) ) );
                     } else {
                         byte[] clientId;
                         int requestId, requestPayload;
                         ParseRequest( req, out clientId, out requestId, 
out requestPayload );
                         WriteLine( "got request, id " + requestId + ", 
payload " + requestPayload + ", total bytes " + req[ 2 ].Length );
                         var answer = CalculateAnswer( req, 
requestPayload );
                         SendAll( router, answer );
                         WriteLine( "answer to " + requestId + " sent" );
                     }
                 }
             }
         }

         private static ZmqSocket CreateRouter() {
             var rsocket = _context.CreateSocket( SocketType.ROUTER );
             rsocket.SendHighWatermark = 0;
             rsocket.ReceiveHighWatermark = 0;
             rsocket.ReceiveTimeout = TimeSpan.FromMilliseconds( 100 );
             rsocket.RouterBehavior = RouterBehavior.Report;
             rsocket.Bind( "tcp://*:" + Port );
             return rsocket;
         }

         private static void ParseRequest( IList<byte[]> request, out 
byte[] clientId, out int requestId, out int requestPayload ) {
             clientId = request.First();
             requestId = BitConverter.ToInt32( request[ 1 ], 0 );
             requestPayload = BitConverter.ToInt32( request[ 2 ], 0 );
         }

         private static IList<byte[]> CalculateAnswer( IList<byte[]> 
req, int payload ) {
             return new [] { req.First(), req[ 1 ], 
BitConverter.GetBytes( payload + 100 ) };
         }

         private static void SendAll( ZmqSocket socket, IList<byte[]> 
byteArrs ) {
             if ( byteArrs.Count > 1 ) {
                 for ( int i = 0 ; i < byteArrs.Count - 1 ; i++ ) {
                     socket.SendMore( byteArrs[ i ] );
                 }
             }
             socket.Send( byteArrs.Last() );
         }

         private static IList<byte[]> ReceiveRequest( ZmqSocket router ) {
             var req = router.ReceiveMessage( TimeSpan.FromMilliseconds( 
200 ) );
             if ( req == null || req.Count() == 0 ) return null;
             if ( !req.IsComplete ) {
                 WriteLine( "received incomplete message - how is this 
possible? Zmq is supposed to guarantee receiving all parts or none at 
all." );
                 return null;
             } else {
                 return req.Select( frame => frame.Buffer ).ToList();
             }
         }

         private static void RunDealer() {
             var dealer = CreateDealer();
             WriteLine( "dealer created" );
             for ( int i = 0 ; i < 2 ; i++ ) {
                 var nothing = dealer.ReceiveMessage( 
TimeSpan.FromMilliseconds( 100 ) );
                 if ( !nothing.IsComplete ) WriteLine( "got incomplete 
message" );
                 if ( !nothing.IsEmpty ) {
                     WriteLine( "got something for nothing" );
                 }
             }
             for ( int i = 0 ; i < 10 ; i++ ) {
                 int reqId = i, payload = i + 20;
                 dealer.SendMore( BitConverter.GetBytes( reqId ) );
                 var payloadBytes = new byte[ 212 ];
                 BitConverter.GetBytes( payload * 10 ).CopyTo( 
payloadBytes, 0 );
                 for ( int j = 4 ; j < payloadBytes.Length ; j++ ) {
                     payloadBytes[ j ] = (byte)j;
                 }
                 dealer.Send( payloadBytes );
             }
             while ( true ) {
                 var answerMsg = dealer.ReceiveMessage( 
TimeSpan.FromMilliseconds( 100 ) );
                 if ( answerMsg.Count() > 0 ) {
                     if ( answerMsg.Count() != 2 ) {
                         WriteLine( "wrong number of frames in result " 
+ answerMsg.Count() );
                     } else {
                         var bytes = answerMsg.Select( Frame => 
Frame.Buffer ).ToArray();
                         var reqId = BitConverter.ToInt32( bytes[ 0 ], 0 );
                         var result = BitConverter.ToInt32( bytes[ 1 ], 0 );
                         WriteLine( "the answer to request " + reqId + " 
is " + result + ", total bytes " + bytes[ 1 ].Length );
                     }
                 } else {
                     Debug.Assert( answerMsg.IsEmpty, "not empty" );
                     Debug.Assert( !answerMsg.IsComplete, "complete" );
                 }
             }
         }

         static ZmqSocket CreateDealer() {
             var dealer = _context.CreateSocket( SocketType.DEALER );
             dealer.SendHighWatermark = 0;
             dealer.ReceiveHighWatermark = 0;
             dealer.Connect( "tcp://localhost:" + Port );
             return dealer;
         }

     }

}






More information about the zeromq-dev mailing list