ServiceMq Hits 10,000 Downloads

I am pleased to see this milestone of 10,000 downloads in the short history of ServiceMq and its underlying communication library ServiceWire, a faster and simpler alternative to WCF .NET to .NET RPC. And the source code for all three can be found on GitHub here.

smq10k

Over the past few weeks both libraries have been improved.

ServiceMq improvements include:

  • Options for the persistence of messages asynchronously to improve overall throughput when message traffic is high
  • ReceiveBulk and AcceptBulk methods were introduced
  • Message caching was refactored to improve performance and limit memory use in scenarios where large numbers of messages are sent and must wait for a destination to become available or received and must wait to be consumed
  • Faster asynchronous file deletion was added which eliminates the standard File.Delete’s permission demand on every message file delete
  • Asynchronous append file logging was added to improve throughput
  • The FastFile class was refactored to support IDisposable and now dedicates a single thread each to asynchronous delete, append and write operations
  • Upgraded to ServiceWire 1.6.3

ServiceWire has had two minor but important bugs fixed:

  • Code was refactored to properly dispose of resources when a connection failure occurs.
  • Previously if the host was not hosting the same assembly version of the interface being used, the connection would hang. This scenario now properly throws an identifiable exception on the client and disposes of the underlying socket or named pipe stream.

Real World Use

In the last month or so, I have had the opportunity to use both of these libraries extensively at work. All of the recent improvements are a direct or indirect result of that real world use. Without disclosing work related details, I believe it is safe to say that these libraries are moving hundreds of messages per second and in some cases 30GB of data between two machines in around three minutes across perhaps 300 RPC method invocations. Some careful usage has been required given our particular use cases in order to reduce connection contention from many thousands of message writer threads across a pool of servers all talking to a single target server. I’ve no doubt that a little fine tuning on the usage side may be required, but overall I’m very happy with the results.

I hope you enjoy these libraries and please contact me if you find any problems with them or need additional functionality. Better yet, jump onto GitHub and submit a pull request of your own. I am happy to evaluate and accept well thought out requests that are in line with my vision for keeping these libraries lightweight and easy to use.

One other note

I recently published ServiceMock, a tiny experimental mocking library that has surprisingly been downloaded over 500 times. If you’re one of the crazy ones, I’d love to hear from you and what you think of it.

ServiceWire and ServiceMq Improvements

Over the past several days, I’ve been working on a day-job project that may be using ServiceMq which uses ServiceWire under the covers. I say “may” because it depends on whether the prototype proves to be sufficiently reliable and efficient. The prototype is really more of an extended integration test with the following requirements:

  • Blocking Send method that throws if sending fails.
    • Tries the primary destination first.
    • Alternative destinations are tried successively until the message is successfully sent.
    • Control over send connection timeout failure to enable “fail fast.”
  • Standard caching receive.

This is because the senders are transient and may not be restarted should they fail. The sender’s also need immediate feedback because the action is part of a transaction involving other operations.

The first order of business was to add the Flash class to ServiceMq. This evolved to become the Flasher class which implements IDisposable in order to take advantage of client connection pooling using the updated PooledDictionary class in ServiceWire (more on this later). The Flasher’s Send method allows you to send a message to a primary destination with zero to many alternate destinations.

[TestMethod]
public void FlashDestDownTcpTest()
{
   var qfrom = new Address(Dns.GetHostName(), 8966);
   var q1Address = new Address(Dns.GetHostName(), 8967);
   var q2Address = new Address(Dns.GetHostName(), 8968);

   // create and use Flasher in using to guarantee Dispose call
   using (var flash = new Flasher(qfrom))
   {
      // create a receiving queue
      using (var q2 = new MessageQueue("qf2", q2Address, @"c:\temp\qf2"))
      {
	     // send to primary which does not exist - received on secondary q2
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q2.Receive();
         Assert.IsTrue(msg.Id == id);
      }

      using (var q1 = new MessageQueue("qf1", q1Address, @"c:\temp\qf1"))
      {
	     // send to primary - received on primary
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q1.Receive();
         Assert.IsTrue(msg.Id == id);
      }

	  // demonstrate Send throws when neither receiver is "up"
      try
      {
         var id = flash.Send(q1Address, "my test message", q2Address);
      }
      catch (Exception e)
      {
         Assert.IsTrue(e is System.Net.WebException);
      }
   }
}

In order to support a more robust client side connection timeout, a significant improvement was made to ServiceWire. The TcpEndPoint class was introduced and an overloaded constructor was added to TcpChannel which allows you to specify a connection timeout value when creating an instance of TcpClient<T> (see below). This involved use of the Socket class’s ConnectAsync method with a SockeAsyncEventArgs object.

private void Initialize(Type serviceType, 
   IPEndPoint endpoint, int connectTimeoutMs)
{
   _serviceType = serviceType;
   _client = new Socket(AddressFamily.InterNetwork, 
                        SocketType.Stream, ProtocolType.Tcp);
   _client.LingerState.Enabled = false;

   var connected = false;
   var connectEventArgs = new SocketAsyncEventArgs
   {
      // must designate the server you want to connect to
      RemoteEndPoint = endpoint
   };
   connectEventArgs.Completed 
      += new EventHandler<SocketAsyncEventArgs>((sender, e) =>
         {
            connected = true;
         });

   if (_client.ConnectAsync(connectEventArgs))
   {
      //operation pending - (false means completed synchronously)
      while (!connected)
      {
         if (!SpinWait.SpinUntil(() => connected, connectTimeoutMs))
         {
            if (null != _client) _client.Dispose();
            throw new TimeoutException("Unable to connect within " 
                                       + connectTimeoutMs + "ms");
         }
      }
   }
   if (connectEventArgs.SocketError != SocketError.Success)
   {
      if (null != _client) _client.Dispose();
      throw new SocketException((int)connectEventArgs.SocketError);
   }

   if (!_client.Connected) throw new SocketException(); 
   _stream = new BufferedStream(new NetworkStream(_client), 8192);
   _binReader = new BinaryReader(_stream);
   _binWriter = new BinaryWriter(_stream);
   SyncInterface(_serviceType);
}

The final problem to solve was TCP/IP port exhaustion. My original implementation had the sending client being created and taken down with each call to the Send method. The construction overhead is minimal but the connect time and the port exhaustion problem quickly becomes a problem where there are potentially many threads sending messages from the same client machine.

To solve the problem, I used a connection pooling strategy that involved making the PooledDictionary<TKey, TValue> implement the IDisposable interface to allow for easy disposal of pooled client objects. The Send method then uses one of two client pools, depending on whether it is a local Named Pipes connection of a TCP connection.

private void SendMessage(OutboundMessage message)
{
   NpClient<IMessageService> npClient = null;
   TcpClient<IMessageService> tcpClient = null;
   IMessageService proxy = null;
   var poolKey = message.To.ToString();
   try
   {
      // determine whether to use NamedPipes or Tcp
      var useNpClient = false;
      if (message.To.Transport == Transport.Both)
      {
         if (message.To.ServerName == message.From.ServerName)
         {
            useNpClient = true;
         }
      }
      else if (message.To.Transport == Transport.Np) useNpClient = true;

	  // requet a client from the pool, providing a way to create a new one
      if (useNpClient)
      {
         npClient = npClientPool.Request(poolKey, 
            () => new NpClient<IMessageService>(
               new NpEndPoint(message.To.PipeName, _connectTimeOutMs)));
         proxy = npClient.Proxy;
      }
      else
      {
         tcpClient = tcpClientPool.Request(poolKey,
            () => new TcpClient<IMessageService>(new TcpEndPoint(
               new IPEndPoint(IPAddress.Parse(message.To.IpAddress), 
                  message.To.Port), _connectTimeOutMs)));
         proxy = tcpClient.Proxy;
      }

	  // send the message via the proxy RPC Enqueue* method
      if (null == message.MessageBytes)
      {
         proxy.EnqueueString(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageString);
      }
      else
      {
         proxy.EnqueueBytes(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageBytes);
      }
   }
   finally
   {
      if (null != tcpClient) tcpClientPool.Release(poolKey, tcpClient);
      if (null != npClient) npClientPool.Release(poolKey, npClient);
   }
}

It remains to be seen whether these changes will result in a sufficiently robust message passing and queuing system to allow it to be used on my day job project. More testing and prototyping is required. There are alternatives to which I can fall back, but none of them are trivial and all of them are less desirable. Given my personal bias, I must take extra care to scrutinize this possible solution and abandon it should it prove to be insufficient.

Broadcast Added to ServiceMq

I have added a Broadcast method to distribute in guaranteed order a single message to multiple destinations. If one of those destinations is down, message delivery will resume when it becomes available again. If the message cannot be delivered in 24 hours, the the message will get logged to the failed log.

The new version of this library also creates send, read and failed log files by minute to assure the files are not too large when there is a very high number of messages flowing. It will also remove the sent and read files after 48 hours, a value you can change in the constructor.

Get the NuGet package here. Or check out the code on GitHub. Here’s some test code that demonstrates how these features work. (Update: the 1.2.2 package just published adds CountOutbound and CountInbound properties to allow you determine if your queues are getting backed up and more receiving or less sending should occur based on the limits you choose.)

[TestMethod] //test of send while dest down
public void DestDownTest()
{
   var q1Address = new Address("qd1pipe");
   var q2Address = new Address("qd2pipe");
   using (var q1 = new MessageQueue("qd1", q1Address, @"c:\temp\qd1"))
   {
      q1.Send(q2Address, "hello world 1");
      Thread.Sleep(200); //destination not available
      q1.Send(q2Address, "hello world 2");
      // now fire up the destination - simulating dest down
      using (var q2 = new MessageQueue("qd2", q2Address, @"c:\temp\qd2"))
      {
         var msg = q2.Receive();
         Assert.IsNotNull(msg);
         Assert.AreEqual(msg.MessageString, "hello world 1");
         msg = q2.Receive();
         Assert.IsNotNull(msg);
         Assert.AreEqual(msg.MessageString, "hello world 2");
      }
   }
}

[TestMethod] //test of broadcast to multiple destinations
public void BroadcastTest()
{
   var q1Address = new Address("qb1pipe");
   var q2Address = new Address("qb2pipe");
   var q3Address = new Address("qb3pipe");
   var q4Address = new Address("qb4pipe");
   using (var q4 = new MessageQueue("qb4", q4Address, @"c:\temp\qb4"))
   using (var q3 = new MessageQueue("qb3", q3Address, @"c:\temp\qb3"))
   using (var q2 = new MessageQueue("qb2", q2Address, @"c:\temp\qb2"))
   using (var q1 = new MessageQueue("qb1", q1Address, @"c:\temp\qb1"))
   {
      q1.Broadcast(new [] 
         { 
            q2Address, 
            q3Address,
            q4Address
         }, "hello\r\nworld");
      var msg2 = q2.Receive();
      Assert.IsNotNull(msg2);
      Assert.AreEqual(msg2.MessageString, "hello\r\nworld");
      var msg3 = q3.Receive();
      Assert.IsNotNull(msg3);
      Assert.AreEqual(msg3.MessageString, "hello\r\nworld");
      var msg4 = q4.Receive();
      Assert.IsNotNull(msg4);
      Assert.AreEqual(msg4.MessageString, "hello\r\nworld");

      // confirm message received is the same
      Assert.AreEqual(msg2.Id, msg3.Id);
      Assert.AreEqual(msg3.Id, msg4.Id);
      Assert.AreEqual(msg2.Sent, msg3.Sent);
      Assert.AreEqual(msg3.Sent, msg4.Sent);
   }
}

If you find it helpful, please give me a shout. If you want a change, shout louder.

ServiceMq–A Peer to Peer Store and Forward Message Queue in C#

It’s a “catch up on blogging” weekend. Some months ago, while learning more about ZeroMq, I wrote and pushed to GitHub and NuGet a little library called ServiceMq, a peer-to-peer “store and forward” message queue library inspired by what I learned about ZeroMq and incorporating the ServiceWire library I had previously created.

ServiceMq is an experimental library at this point. I have not spent any time thoroughly testing or improving it since I created it. This is not because it’s not a cool project but only because my time has been limited by demands of the day job and family. One must have priorities. That’s what my wife says anyway.

So now with a brief moment of free time, I’m happy to share with you this little bit of work. Let me explain how it works and then I’ll share some test code here to illustrate. If you are interested in it, I urge you to get the NuGet package or clone the code and try it out and let me know if it has been useful to you.

It is also very important for me to mention that I pulled in and renamed the namespaces for neatness within the library the entire ServiceStack.Text v3 code base as the serialization library used by ServiceMq to enable fast and easy serialization using JSON across the wire without burdening the user of the library with having to make any special accommodations with their message DTO classes. You need to know that after v3, the ServiceStack.Text library’s license changed, so if you plan to use it on your own, be aware of the change. The version I’ve used is 100% compatible with the Apache 2.0 license and derivative notice in the code on GitHub.

While the test code below are the only tests I’ve written for the project. They cover only the primary use cases. The tests have both sender and receiver queues in a single process. In practice you would use the library generally in two processes to enable message passing between them.

The store and forward persistence of messages is important for this library as performance was less important than guaranteed sending and receiving. Scale and memory consumption were not addressed in this initial release.

Here’s the order of events on the sending end:

  • Send method first writes the message to a file.
  • Send method then tries to send to the intended recipient.
  • If the send fails, the message is placed on a “failed-retry” queue.
  • If the sending process fails or is shut down, all persisted messages are read back into memory when the process restarts and creates the message queue again.
  • When the message is successfully sent, the outbound message file is deleted after the message content is appended to a rolling outbound log so that an audit of messages sent is possible.

Now here’s the order of events on the receiving end:

  • The message queue receives a message and writes it to a file.
  • The queue’s Receive method is called and pulls a message when it becomes available off the queue and calls Acknowledge method (see more on Acknowledge below).
  • Or the queue’s Accept method is called and pulls a message when it becomes available off the queue but does NOT call the Acknowledge method. This is used by code that may fail to process the message and so the message is not actually removed from the inbound queue.
  • The Acknowledge method is called, either automatically in the Receive method, or manually after the Accept method is used. The Acknowledge method logs by appending the message to the inbound message log and deletes the individual message file.
  • If the receive process fails before the Acknowledge method is called to delete the message file and log it, the incoming queue will read it into memory prior to new messages arriving in order go guarantee order of delivery of the messages.

Now here’s the test code that shows how each end works:

[TestMethod]
public void SimpleTest()
{
    var q1Address = new Address("q1pipe");
    var q2Address = new Address("q2pipe");
    using (var q2 = new MessageQueue("q2", q2Address, @"c:\temp\q2"))
    using (var q1 = new MessageQueue("q1", q1Address, @"c:\temp\q1"))
    {
        q1.Send(q2Address, "hello world");
        var msg = q2.Receive();
        Assert.IsNotNull(msg);
        Assert.AreEqual(msg.MessageString, "hello world");
    }
}

[TestMethod]
public void SimpleTcpTest()
{
    var q1Address = new Address(Dns.GetHostName(), 8967);
    var q2Address = new Address(Dns.GetHostName(), 8968);
    using (var q2 = new MessageQueue("q2", q2Address, @"c:\temp\q2"))
    using (var q1 = new MessageQueue("q1", q1Address, @"c:\temp\q1"))
    {
        q1.Send(q2Address, "hello world");
        var msg = q2.Receive();
        Assert.IsNotNull(msg);
        Assert.AreEqual(msg.MessageString, "hello world");
    }
}

[TestMethod]
public void SimpleObjectTest()
{
    var q1Address = new Address("q6pipe");
    var q2Address = new Address("q8pipe");
    using (var q2 = new MessageQueue("q8", q2Address, @"c:\temp\q8"))
    using (var q1 = new MessageQueue("q6", q1Address, @"c:\temp\q6"))
    {
        int[] data = new int[] { 4, 8, 9, 24 };
        q1.Send(q2Address, data);
        Message msg = q2.Receive();
        Assert.IsNotNull(msg);
        var data2 = msg.To<int[]>();
        Assert.AreEqual(data[1], data2[1]);
    }
}

[TestMethod]
public void SimpleBinaryTest()
{
    var q1Address = new Address("q3pipe");
    var q2Address = new Address("q4pipe");
    using (var q2 = new MessageQueue("q4", q2Address, @"c:\temp\q4"))
    using (var q1 = new MessageQueue("q3", q1Address, @"c:\temp\q3"))
    {
        byte[] data = new byte[] { 4, 8, 9, 24 };
        q1.SendBytes(q2Address, data, "mybytestest");
        Message msg = null;
        while (true)
        {
            msg = q2.Receive();
            if (msg.MessageBytes != null) break;
        }
        Assert.IsNotNull(msg);
        Assert.AreEqual(msg.MessageBytes.Length, 4);
        Assert.AreEqual(msg.MessageBytes[2], (byte)9);
        Assert.AreEqual(msg.MessageTypeName, "mybytestest");
    }
}

I’m certain the code base needs work and needs to be tested under load and limited memory circumstances. Perhaps even a caching strategy needs to be implemented for scenarios where message volume is very high. I look forward to your feedback.