[zeromq-dev] P/Invoke-based CLR access to zeroMQ

Tamara Kustarova kustarova at fastmq.com
Fri Mar 6 12:38:17 CET 2009


Hello Barak,

I've tried the C# version of zmq that you suggested. I needed to make 
there some changes in order to make the calling conventions compatible. 
I tested it with cs_remote_thr and cs_local_thr scenarios and if I set 
the roundtrip count too large (I try to send large amount of messages), 
the aplication that sends data crashes with System.AccessViolationException.

Don't you have any hints? I am addressing this mail to you Barak, 
because you wrote the code, but generally anybody could have a good idea 
why this is happening.

The previous implementation using dnzmq.dll written in C++ works fine 
with cs_remote_thr nd cs_local_thr, so there probably won't be problem 
in these two.

The code of C# dll looks like this.
using System;
using System.Runtime.InteropServices;

namespace zmq
{

    public class Dnzmq : IDisposable
    {
        private IntPtr zmq_;

        public const int SCOPE_LOCAL = 0;
        public const int SCOPE_GLOBAL = 1;

        public Dnzmq()
        {
            zmq_ = IntPtr.Zero;
        }

        public Dnzmq(string host)
        {
            Open(host);
        }
       
        public void Open(string host)
        {
            zmq_ = czmq_create(host);
        }

        public bool IsOpen { get { return zmq_ == IntPtr.Zero; } }

        public int create_exchange (string exchange, int scope, string nic)
        {
            if (zmq_ == IntPtr.Zero)
                throw new NullReferenceException("queue must be 
initialized");
            return czmq_create_exchange(zmq_, exchange, scope, nic);
        }

        public int create_queue (string queue, int scope, string nic, 
Int64 hwm, Int64 lwm, Int64 swapSize)
        {
            if (zmq_ == IntPtr.Zero)
                throw new NullReferenceException("queue must be 
initialized");
            return czmq_create_queue(zmq_, queue, scope, nic, hwm, lwm, 
swapSize);
        }

        public void bind(string exchange, string queue, string 
exchangeArgs, string queueArgs)
        {
            if (zmq_ == IntPtr.Zero)
                throw new NullReferenceException("queue must be 
initialized");
            czmq_bind(zmq_, exchange, queue, exchangeArgs, queueArgs);
        }

        public void send(int eid, byte[] data)
        {
            if (zmq_ == IntPtr.Zero)
                throw new NullReferenceException("queue must be 
initialized");
            IntPtr ptr = Marshal.AllocHGlobal(data.Length);
            Marshal.Copy(data, 0, ptr, data.Length);
            czmq_send(zmq_, eid, ptr, Convert.ToUInt32(data.Length), 
Marshal.FreeHGlobal);
        }

        public byte[] receive()
        {
            if (zmq_ == IntPtr.Zero)
                throw new NullReferenceException("queue must be 
initialized");
            IntPtr data;
            UInt32 dataSize;
            FreeMsgData freeFunc;
            czmq_receive (zmq_, out data, out dataSize, out freeFunc);

            if (data == IntPtr.Zero)
                return new byte[0];

            byte[] msg = new byte[dataSize];
            Marshal.Copy(data, msg, 0, Convert.ToInt32(dataSize));
            if (freeFunc != null)
                freeFunc(data);
            return msg;
        }

        [UnmanagedFunctionPointer (CallingConvention.Cdecl)]
        private delegate void FreeMsgData(IntPtr ptr);
   
        public void Close()
        {
            if (zmq_ != IntPtr.Zero)
            {
                czmq_destroy(zmq_);
                zmq_ = IntPtr.Zero;
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            Close();
        }

        #endregion

        #region C API

        [DllImport ("..\\..\\..\\..\\Debug\\libczmq.dll", CharSet = 
CharSet.Ansi)]
        static extern IntPtr czmq_create(string host);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll")]
        static extern void czmq_destroy(IntPtr zmq);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll", 
CharSet=CharSet.Ansi)]
        static extern int czmq_create_exchange (IntPtr zmq, string 
exchange, int scope, string nic);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll", 
CharSet=CharSet.Ansi)]
        static extern int czmq_create_queue (IntPtr zmq, string queue, 
int scope, string nic,
            Int64 hwm, Int64 lwm, Int64 swapSize);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll", 
CharSet=CharSet.Ansi)]
        static extern void czmq_bind (IntPtr zmq, string exchange, 
string queue,
            string exchangeArgs, string queueArgs);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll", 
CallingConvention = CallingConvention.Cdecl)]
        static extern void czmq_send (IntPtr zmq, int eid, IntPtr data_, 
UInt32 size, FreeMsgData ffn);

        [DllImport("..\\..\\..\\..\\Debug\\libczmq.dll", 
CallingConvention = CallingConvention.Cdecl)]
        static extern void czmq_receive (IntPtr zmq, [Out] out IntPtr 
data, [Out] out UInt32 size, [Out] out FreeMsgData ffn);

        #endregion
    }
}




More information about the zeromq-dev mailing list