tsJensen

A quest for software excellence...

ServiceWire and ServiceMq Improvements

Over the past several days, I’ve been working on a day-job project that may be using ServiceMq which uses ServiceWire under the covers. I say “may” because it depends on whether the prototype proves to be sufficiently reliable and efficient. The prototype is really more of an extended integration test with the following requirements:

  • Blocking Send method that throws if sending fails.
    • Tries the primary destination first.
    • Alternative destinations are tried successively until the message is successfully sent.
    • Control over send connection timeout failure to enable “fail fast.”
  • Standard caching receive.

This is because the senders are transient and may not be restarted should they fail. The sender’s also need immediate feedback because the action is part of a transaction involving other operations.

The first order of business was to add the Flash class to ServiceMq. This evolved to become the Flasher class which implements IDisposable in order to take advantage of client connection pooling using the updated PooledDictionary class in ServiceWire (more on this later). The Flasher’s Send method allows you to send a message to a primary destination with zero to many alternate destinations.

[TestMethod]
public void FlashDestDownTcpTest()
{
   var qfrom = new Address(Dns.GetHostName(), 8966);
   var q1Address = new Address(Dns.GetHostName(), 8967);
   var q2Address = new Address(Dns.GetHostName(), 8968);

   // create and use Flasher in using to guarantee Dispose call
   using (var flash = new Flasher(qfrom))
   {
      // create a receiving queue
      using (var q2 = new MessageQueue("qf2", q2Address, @"c:\temp\qf2"))
      {
	     // send to primary which does not exist - received on secondary q2
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q2.Receive();
         Assert.IsTrue(msg.Id == id);
      }

      using (var q1 = new MessageQueue("qf1", q1Address, @"c:\temp\qf1"))
      {
	     // send to primary - received on primary
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q1.Receive();
         Assert.IsTrue(msg.Id == id);
      }

	  // demonstrate Send throws when neither receiver is "up"
      try
      {
         var id = flash.Send(q1Address, "my test message", q2Address);
      }
      catch (Exception e)
      {
         Assert.IsTrue(e is System.Net.WebException);
      }
   }
}

In order to support a more robust client side connection timeout, a significant improvement was made to ServiceWire. The TcpEndPoint class was introduced and an overloaded constructor was added to TcpChannel which allows you to specify a connection timeout value when creating an instance of TcpClient<T> (see below). This involved use of the Socket class’s ConnectAsync method with a SockeAsyncEventArgs object.

private void Initialize(Type serviceType, 
   IPEndPoint endpoint, int connectTimeoutMs)
{
   _serviceType = serviceType;
   _client = new Socket(AddressFamily.InterNetwork, 
                        SocketType.Stream, ProtocolType.Tcp);
   _client.LingerState.Enabled = false;

   var connected = false;
   var connectEventArgs = new SocketAsyncEventArgs
   {
      // must designate the server you want to connect to
      RemoteEndPoint = endpoint
   };
   connectEventArgs.Completed 
      += new EventHandler<SocketAsyncEventArgs>((sender, e) =>
         {
            connected = true;
         });

   if (_client.ConnectAsync(connectEventArgs))
   {
      //operation pending - (false means completed synchronously)
      while (!connected)
      {
         if (!SpinWait.SpinUntil(() => connected, connectTimeoutMs))
         {
            if (null != _client) _client.Dispose();
            throw new TimeoutException("Unable to connect within " 
                                       + connectTimeoutMs + "ms");
         }
      }
   }
   if (connectEventArgs.SocketError != SocketError.Success)
   {
      if (null != _client) _client.Dispose();
      throw new SocketException((int)connectEventArgs.SocketError);
   }

   if (!_client.Connected) throw new SocketException(); 
   _stream = new BufferedStream(new NetworkStream(_client), 8192);
   _binReader = new BinaryReader(_stream);
   _binWriter = new BinaryWriter(_stream);
   SyncInterface(_serviceType);
}

The final problem to solve was TCP/IP port exhaustion. My original implementation had the sending client being created and taken down with each call to the Send method. The construction overhead is minimal but the connect time and the port exhaustion problem quickly becomes a problem where there are potentially many threads sending messages from the same client machine.

To solve the problem, I used a connection pooling strategy that involved making the PooledDictionary<TKey, TValue> implement the IDisposable interface to allow for easy disposal of pooled client objects. The Send method then uses one of two client pools, depending on whether it is a local Named Pipes connection of a TCP connection.

private void SendMessage(OutboundMessage message)
{
   NpClient<IMessageService> npClient = null;
   TcpClient<IMessageService> tcpClient = null;
   IMessageService proxy = null;
   var poolKey = message.To.ToString();
   try
   {
      // determine whether to use NamedPipes or Tcp
      var useNpClient = false;
      if (message.To.Transport == Transport.Both)
      {
         if (message.To.ServerName == message.From.ServerName)
         {
            useNpClient = true;
         }
      }
      else if (message.To.Transport == Transport.Np) useNpClient = true;

	  // requet a client from the pool, providing a way to create a new one
      if (useNpClient)
      {
         npClient = npClientPool.Request(poolKey, 
            () => new NpClient<IMessageService>(
               new NpEndPoint(message.To.PipeName, _connectTimeOutMs)));
         proxy = npClient.Proxy;
      }
      else
      {
         tcpClient = tcpClientPool.Request(poolKey,
            () => new TcpClient<IMessageService>(new TcpEndPoint(
               new IPEndPoint(IPAddress.Parse(message.To.IpAddress), 
                  message.To.Port), _connectTimeOutMs)));
         proxy = tcpClient.Proxy;
      }

	  // send the message via the proxy RPC Enqueue* method
      if (null == message.MessageBytes)
      {
         proxy.EnqueueString(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageString);
      }
      else
      {
         proxy.EnqueueBytes(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageBytes);
      }
   }
   finally
   {
      if (null != tcpClient) tcpClientPool.Release(poolKey, tcpClient);
      if (null != npClient) npClientPool.Release(poolKey, npClient);
   }
}

It remains to be seen whether these changes will result in a sufficiently robust message passing and queuing system to allow it to be used on my day job project. More testing and prototyping is required. There are alternatives to which I can fall back, but none of them are trivial and all of them are less desirable. Given my personal bias, I must take extra care to scrutinize this possible solution and abandon it should it prove to be insufficient.