tsJensen

A quest for software excellence...

ServiceWire 5.0 with .NET Core Support Released

I’m happy to announce the release of ServiceWire 5.0 with .NET Core support published to NuGet today. The final trick was preparation of the NuGet package which was greatly helped by Armen Shimoon of dotnetliberty on using project.json for NuGet package metadata. The post was written in January 2016, so it was a bit out of date. The project.json file ended up like this:

{
  "name": "ServiceWire",
  "title": "ServiceWire",
  "authors": [ "Tyler Jensen" ],
  "description": "ServiceWire is a very fast...",
  "projectUrl": "https://github.com/tylerjensen/ServiceWire",
  "packOptions": {
    "iconUrl": "http://www.tsjensen.com/blog/image.axd?picture=2014/11/swlogo_sm.png",
    "licenseUrl": "http://www.apache.org/licenses/LICENSE-2.0",
    "copyright": "Tyler Jensen 2013-2016",
    "owners": [ "Tyler Jense" ],
    "summary": "ServiceWire is a fast and easy RPC library...",
    "releaseNotes": "BREAKING CHANGES: Ported to .NET Core netstandard1.6...",
    "tags": [ "WCF", "Services", "Host", "Client", "..." ]
  },
  "version": "5.0.0-*",

  "dependencies": {
    "NETStandard.Library": "1.6.0",
    "Newtonsoft.Json": "9.0.1"
  },

  "frameworks": {
    "netstandard1.6": {
      "imports": "dnxcore50",
      "dependencies": {
        "System.Reflection.Emit": "4.0.1",
        "System.Threading.Thread": "4.0.0"
      }
    },
    "net45": {
      "dependencies": {
        "System.Reflection.Emit": "4.0.1"
      },
      "frameworkAssemblies": {
        "System.Management": "4.0.0.0"
      }
    },
    "net461": {
      "dependencies": {
        "System.IO.Pipes": "4.0.0",
        "System.Reflection.Emit": "4.0.1",
        "System.Threading.Thread": "4.0.0"
      },
      "frameworkAssemblies": {
        "System.Management": "4.0.0.0"
      }
    },
    "net40": {
      "frameworkAssemblies": {
        "System.Management": "4.0.0.0"
      }
    },
    "net35": {
      "dependencies": {
        "TaskParallelLibrary": "1.0.2856"
      }
    }
  }
}

Then packaging up the NuGet package was easy with this command:

dotnet pack ServiceWire.Core -c Release -o D:\NugetPackages

Then just push it to NuGet. Simple as that.

And this concludes the porting of ServiceWire to .NET Core. But there’s more. There is always more. Some features were disabled in the NET Stardard 1.6 build and so a bit of utility work is still required. And there is some fun work ahead to make the library even easier to use. And then there is some performance work that out to be done along with some new unit tests using xUnit.

If you enjoy using ServiceWire, I’d love to hear from you. And I would love to have more pull requests from those who find bugs or ways to improve it.

.NET Core on Linux with ServiceWire

After a BIOS update, I was able to get Hyper-V working on my Windows 10 Pro machine and spun up an Ubuntu 16.04 instance. Once it was up and running I just followed the .NET Core Linux install instructions. No other changes to the VM.

Using the Portable Application instructions on the new docs.microsoft.com site for .NET Core, I executed the following commands to prep the CoreTestClient1 project for deployment to the Linux VM with my command prompt in the root directory of the project.

dotnet restore

and then

dotnet publish -f netcoreapp1.0 -c release

From there it was just a matter of copying the files in the \bin\Release\netcoreapp1.0\publish folder to a new folder called test1 on the Linux VM using an smb local file share.

After spinning up a debug instance of CoreTestHost on my Windows machine in Visual Studio, I tried to run CoreTestClient1 in the Ubuntu VM first without the extension. Oops. Then got it right and then tried it again just our of sheer delight.

linuxdotnet

The results are not particularly impressive as I was running on a VM with a single core, but the fact that it ran and without a hitch was enough to make me very happy. With a single day of porting to .NET Core and one more to remember how to install and use Linux on a Windows Hyper-V virtual machine and not a single bug that had to be fixed, I was able to complete the following essential requirements using only .NET Core, C# and ServiceWire

Requirements

  • Write an interface in a class library project called Common.
  • Write an implementation for that interface in class library project called Impl.
  • Write a console app that will host the implementation in a project called Host.
    • This project may reference Common and Impl.
    • Using ServiceWire allow remote calls to the implementation of the interface.
  • Write a console app called project Client.
    • This project may only reference Common.
    • Using ServiceWire, connect to Host and call the methods on the interface.
    • Write the results to the console.
  • Write all of this code on a single Windows machine with Visual Studio.
  • Run the Host project on the Windows machine.
  • Deploy the Client project to a Linux machine and run it from the Linux shell.
  • No tricks. You may not use Mono.

For examples of the code that you might find in the Host and Client, have a look at my previous post.

What’s Next?

Next I will be working on producing a NuGet package for ServiceWire 5.0.

Final Confession
It wasn’t quite all that smooth as I had not touched Linux or Hyper-V in years, so there were a few bumps, including a hosed up virtual network adapter that left my first Ubuntu install crippled. I also tried building and deploying a self contained app which required changes to the project.json file, but I gave it up as folly since I had already installed .NET Core on the Linux VM. And after deploying the portable app the first time, I was trying to run the app with “dotnet run” rather than “dotnet {assemblyFileName}” and of course that did not work. All told, I spent about 6 to 8 hours on all of this but that was broken up by multiple distractions so it required two calendar days. Now that the learning curve has been climbed, the next time out of the box should be much easier. And I hope this post will help you. I know it will serve well as a bookmark for me the next time I climb this curve. And that should be soon.

ServiceWire and .NET Core Integration Tests

I’m very happy with the quick and dirty .NET Core integration test code I’ve just committed to GitHub for ServiceWire. The only changes to existing integration test code were these:

Removal of Named Pipes from the test since .NET Core does not support Named Pipes (so far as I can learn) as Named Pipes is a Windows only thing (again, so far as I have learned).

Modified code to get IP address and port from command line args rather than configuration along with some defaults.

So here’s the primary host host code. Note how easy it is to host implementations for multiple interfaces.

class Program
{
   static void Main(string[] args)
   {
      var logger = new Logger(logLevel: LogLevel.Debug);
      var stats = new Stats();

      var addr = new[] { "127.0.0.1", "8098" }; //defaults
      if (null != args && args.Length > 0)
      {
         var parts = args[0].Split(':');
         if (parts.Length > 1) addr[1] = parts[1];
         addr[0] = parts[0];
      }

      var ip = addr[0];
      var port = Convert.ToInt32(addr[1]);
      var ipEndpoint = new IPEndPoint(IPAddress.Any, port);

      var useCompression = false;
      var compressionThreshold = 131072; //128KB

      var tester = new NetTester();
      var mytester = new MyTester();

      var tcphost = new TcpHost(ipEndpoint, logger, stats);
      tcphost.UseCompression = useCompression;
      tcphost.CompressionThreshold = compressionThreshold;
      tcphost.AddService<INetTester>(tester);
      tcphost.AddService<IMyTester>(mytester);

      var valTypes = new ValTypes();
      tcphost.AddService<IValTypes>(valTypes);

      tcphost.Open();

      Console.WriteLine("Press Enter to stop the dual host test.");
      Console.ReadLine();

      tcphost.Close();

      Console.WriteLine("Press Enter to quit.");
      Console.ReadLine();
   }
}

And here’s the client code.

class Program
{
   private static void Main(string[] args)
   {
      var addr = new[] { "127.0.0.1", "8098" }; //defaults
      if (null != args && args.Length > 0)
      {
         var parts = args[0].Split(':');
         if (parts.Length > 1) addr[1] = parts[1];
         addr[0] = parts[0];
      }

      var ip = addr[0];
      var port = Convert.ToInt32(addr[1]);
      var ipEndpoint = new IPEndPoint(IPAddress.Parse(ip), port);
      for (int i = 0; i < 1; i++) RunTest(ipEndpoint, ip);

      Console.ReadLine();
   }

   private static void RunTest(IPEndPoint ipEndpoint, string ip)
   {
      using (var client = new TcpClient<IValTypes>(ipEndpoint))
      {
         decimal abc = client.Proxy.GetDecimal(4.5m);
         bool result = client.Proxy.OutDecimal(abc);
      }

      using (var client = new NetTcpTesterProxy(ipEndpoint))
      {
         var id = client.GetId("test1", 3.314, 42, DateTime.Now);
         long q = 3;
         var response = client.Get(id, "mirror", 4.123, out q);
         var list = client.GetItems(id);
      }
      using (var client = new NetTcpMyTesterProxy(ipEndpoint))
      {
         var id = client.GetId("test1", 3.314, 42);
         int q2 = 4;
         var response = client.Get(id, "mirror", 4.123, out q2);
         var list = client.GetItems(id, new int[] { 3, 6, 9 });
      }

      var sw = Stopwatch.StartNew();
      var from = 0;
      var to = 400;
      Parallel.For(from, to, index =>
      {
         using (var client = new NetTcpTesterProxy(ipEndpoint))
         {
            for (int i = 0; i < 10; i++)
            {
               var id = client.GetId("test1", 3.314, 42, DateTime.Now);
               long q = 2;
               var response = client.Get(id, "mirror", 4.123, out q);
               var list = client.GetItems(id);
            }
         }

         using (var client = new NetTcpMyTesterProxy(ipEndpoint))
         {
            for (int i = 0; i < 10; i++)
            {
               var id = client.GetId("test1", 3.314, 42);
               int q2 = 6;
               var response = client.Get(id, "mirror", 4.123, out q2);
               var list = client.GetItems(id, new int[] { 3, 6, 9 });
            }
         }
      });
      sw.Stop();
      var msperop = sw.ElapsedMilliseconds / 24000.0;
      Console.WriteLine("tcp: {0}, {1}", sw.ElapsedMilliseconds, msperop);
   }
}

It’s quite simple, but I’m going to be working on both host and client code to make it easier to code up both of them with some convention assumptions that will also not break existing host and client code. After that, I’ll publish a new NuGet package with a major version rev to 5.0.

ServiceWire on .NET Core

With a few days of free time on my hands, I’ve picked up an old open source project of mine and kicked off a .NET Core port of ServiceWire which still needs testing and much more refactoring, I'm sure. Just getting everything to build was a challenge. Especially in the area of Reflection. Here’s the bridge code I wrote to avoid making to many IFDEF blocks in the code.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Reflection.Emit;
using System.Threading.Tasks;

namespace ServiceWire
{
   public static class TypeExtensions
   {
      public static Type BaseType(this Type t)
      {
#if NETSTANDARD1_6
         return t.GetTypeInfo().BaseType;
#else
         return t.BaseType;
#endif
      }

      public static bool IsInterface(this Type t)
      {
#if NETSTANDARD1_6
         return t.GetTypeInfo().IsInterface;
#else
         return t.IsInterface;
#endif
      }

#if NETSTANDARD1_6
      public static MethodInfo[] GetMethods(this Type t)
      {
         return t.GetTypeInfo().GetMethods();
      }
#endif

#if NETSTANDARD1_6
      public static Type[] GetInterfaces(this Type t)
      {
         return t.GetTypeInfo().GetInterfaces();
      }
#endif

#if NETSTANDARD1_6
      public static Type CreateType(this TypeBuilder b)
      {
         return b.CreateTypeInfo().AsType();
      }
#endif

#if NETSTANDARD1_6
      public static ConstructorInfo GetConstructor(this Type t, Type[] ctorArgTypes)
      {
         return t.GetTypeInfo().GetConstructor(ctorArgTypes);
      }
#endif

#if NETSTANDARD1_6
      public static MethodInfo GetMethod(this Type t, string invokeMethod, BindingFlags flags)
      {
         return t.GetTypeInfo().GetMethod(invokeMethod, flags);
      }

      public static MethodInfo[] GetMethods(this Type t, BindingFlags flags)
      {
         return t.GetTypeInfo().GetMethods(flags);
      }

      public static PropertyInfo[] GetProperties(this Type t, BindingFlags flags)
      {
         return t.GetTypeInfo().GetProperties(flags);
      }

#endif

      public static bool IsValueType(this Type t)
      {
#if NETSTANDARD1_6
         return t.GetTypeInfo().IsValueType;
#else
         return t.IsValueType;
#endif
      }

      public static Type[] GetGenericArguments(this PropertyInfo pi)
      {
#if NETSTANDARD1_6
         return pi.PropertyType.GetTypeInfo().GetGenericArguments();
#else
         return pi.PropertyType.GetGenericArguments();
#endif
      }

      public static bool IsGenericType(this Type t)
      {
#if NETSTANDARD1_6
         return t.GetTypeInfo().IsGenericType;
#else
         return t.IsGenericType;
#endif
      }

      public static bool IsPrimitive(this Type t)
      {
#if NETSTANDARD1_6
         return t.GetTypeInfo().IsPrimitive;
#else
         return t.IsPrimitive;
#endif
      }

   }
}

It’s a quick and dirty implementation that needs more work. The split of Type into Type and TypeInfo classes in .NET Core is a pain but I can see why they would want to eliminate Reflection except where it is absolutely necessary.

Very Large Pseudo Random Number Generator in C#

Thanks to a friend of mine, over the past few weeks the idea of zero knowledge authentication, which has been around for a long time but to which I’d never paid any attention, has been percolating in my head. Finally on Thursday (Thanksgiving holiday here in the U.S.), I dove in and added it to ServiceWire.

Now you can secure your ServiceWire RPC calls without PKI or exchanging keys or exposing a password across the wire. It validates on the server that the client knows the secret key (password) and on the client that the server knows the secret key. It creates a shared encryption key that is used for that one connection session and is never passed across the wire, allowing all subsequent communications between client and server to be encrypted with strong and fast symmetric encryption, denying any man in the middle from access to the data or even the service definition.

But this post is not about the entire zero knowledge addition to ServiceWire. Instead I want to share one part of that addition which you may use to create a very large random number. In this case, 512 bytes (that’s 4096 bits for those of you who can only count to 1). If you want a larger random number, you’ll need a larger safe prime from a Sophie Germain prime. In ServiceWire, I only include the 512 byte safe prime. There are a number of sources for larger safe primes freely available on the web.

This is not an algorithm entirely of my invention. It is based on several different algorithms I studied while investigating zero knowledge secure remote password protocols. I did take some liberties by adding a randomly selected smaller safe prime as the exponent in the algorithm.

And here’s the code:

using System.Numerics;

public class ZkProtocol
{
   private readonly SHA256 _sha;
   private readonly Random _random;
   private readonly BigInteger _n;

   public ZkProtocol()
   {
      _sha = SHA256.Create(); //used in other protocol methods
      _random = new Random(DateTime.Now.Millisecond);
      _n = new BigInteger(ZkSafePrimes.N4);
   }
   
   //...other protocol methods excluded for post

   /// <summary>
   /// Generate crypto safe, pseudo random number.
   /// </summary>
   /// <param name="bits">max value supported is 4096</param>
   /// <returns></returns>
   public byte[] CryptRand(int bits = 4096)
   {
      var rb = new byte[256];
      _random.NextBytes(rb);
      var bigrand = new BigInteger(rb);
      var crand = BigInteger.ModPow(bigrand, 
         ZkSafePrimes.GetSafePrime(_random.Next(0, 2047)), _n);
      var bytes = crand.ToByteArray();
      if (bits >= 4096) return bytes;
      var count = bits / 8;
      var skip = _random.Next(0, bytes.Length - count);
      return bytes.Skip(skip).Take(count).ToArray();
   }
}

internal static class ZkSafePrimes
{
   internal static byte[] N4
   {
      get { return Safe4096BitPrime; }
   }

   private static byte[] Safe4096BitPrime = new byte[]
   {
      //data not shown here for post (see on GitHub)
   };
   
   public static int GetSafePrime(int index)
   {
      if (index < 1) index = 1;
      else if (index > 2047) index = 2047;
      else if (index % 2 == 0) index++;
      return _safePrimes[index];
   }

   //2048 values, every second value is a safe primes 
   //the previous value is the Sophie Germain prime
   //as in q is the Sophie Germain and N = 2q+1 is safe
   private static int[] _safePrimes = new[]
   {
      5903,11807,8741,17483 //first four value only - see GitHub
   };
}

See the code on GitHub. Look specifically at the ZkProtocol and ZkSafePrimes classes.

I should note that in ServiceWire a new instance of ZkProtocol is used for each connection, so the Random object should provide a healthy random seed to the algoritm given the clock's milliseconds seed on construction. If you find this useful, I would love to hear from you.

ServiceMq Hits 10,000 Downloads

I am pleased to see this milestone of 10,000 downloads in the short history of ServiceMq and its underlying communication library ServiceWire, a faster and simpler alternative to WCF .NET to .NET RPC. And the source code for all three can be found on GitHub here.

smq10k

Over the past few weeks both libraries have been improved.

ServiceMq improvements include:

  • Options for the persistence of messages asynchronously to improve overall throughput when message traffic is high
  • ReceiveBulk and AcceptBulk methods were introduced
  • Message caching was refactored to improve performance and limit memory use in scenarios where large numbers of messages are sent and must wait for a destination to become available or received and must wait to be consumed
  • Faster asynchronous file deletion was added which eliminates the standard File.Delete’s permission demand on every message file delete
  • Asynchronous append file logging was added to improve throughput
  • The FastFile class was refactored to support IDisposable and now dedicates a single thread each to asynchronous delete, append and write operations
  • Upgraded to ServiceWire 1.6.3

ServiceWire has had two minor but important bugs fixed:

  • Code was refactored to properly dispose of resources when a connection failure occurs.
  • Previously if the host was not hosting the same assembly version of the interface being used, the connection would hang. This scenario now properly throws an identifiable exception on the client and disposes of the underlying socket or named pipe stream.

Real World Use

In the last month or so, I have had the opportunity to use both of these libraries extensively at work. All of the recent improvements are a direct or indirect result of that real world use. Without disclosing work related details, I believe it is safe to say that these libraries are moving hundreds of messages per second and in some cases 30GB of data between two machines in around three minutes across perhaps 300 RPC method invocations. Some careful usage has been required given our particular use cases in order to reduce connection contention from many thousands of message writer threads across a pool of servers all talking to a single target server. I’ve no doubt that a little fine tuning on the usage side may be required, but overall I’m very happy with the results.

I hope you enjoy these libraries and please contact me if you find any problems with them or need additional functionality. Better yet, jump onto GitHub and submit a pull request of your own. I am happy to evaluate and accept well thought out requests that are in line with my vision for keeping these libraries lightweight and easy to use.

One other note

I recently published ServiceMock, a tiny experimental mocking library that has surprisingly been downloaded over 500 times. If you’re one of the crazy ones, I’d love to hear from you and what you think of it.

ServiceMock a New ServiceWire Based Project

I know. There are some really great mocking libraries. The one I’ve used the most is Moq 4. While I’ve not been a regular user of mock libraries. I am fascinated with their usefulness and I’ve recently been thinking about how I might utilize the ServiceWire dynamic proxy to create a simple and easily extended mock library. After a few hours of work this morning, the first experimental of ServiceMock comes to life.

This is not a serious attempt to replace Moq or any other mocking library. It is for the most part a way to demonstrate how to use the dynamic proxy of ServiceWire to do something more than interception or remote procedure call (RPC). It is entirely experimental, but you can get it via NuGet as well.

With ServiceMock, you can now do something like this:

// create your interface
public interface IDoSomething
{
   void DoNoReturn(int a, int b);
   string DoSomeReturn(string a, string b);
}

// now mock and use the mock
// note: you don't have an implementation of the interface
class Program
{
   static void Main(string[] args)
   {
      var mock = Mock.Make<IDoSomething>();

      mock.DoNoReturn(4, 5);
      var mockReturnValue = mock.DoSomeReturn("a", "b");
      Console.WriteLine(mockReturnValue);

      Console.WriteLine("Press Enter to quit.");
      Console.ReadLine();
   }
}

To create a library that takes advantage of the ServiceWire dynamic proxy, you need a factory (Mock), a channel (MockChannel) that the dynamic proxy will invoke, a channel constructor (MockDefinition) parameter class, and finally an function for invoke and exception handling should the invoke throw (MockActions). And of course, you can supply your own customized function and assign it to the MockActions instance.

The heart of the extensibility is the ability to inject your own “invoke” function via the instance of the MockActions class in the MockDefinition constructor parameter.

var mock = Mock.Make<IDoSomething>(new MockDefinition
{
   Id = 1,
   Actions = new MockActions
   {
      Invoke = 
         (id, methodName, returnType, parameters) =>
         {
            // do your thing here
            var retval = new object[parameters.Length + 1];

            // assign your return value to the first object
            // in the return array
            retval[0] = returnType.Name == "String"
               ? returnType.ToString()
               : TypeHelper.GetDefault(returnType);

            //by default, return all parameters as supplied
            for (int i = 0; i < parameters.Length; i++)
            {
               retval[i + 1] = parameters[i];
            }
            return new object[parameters.Length + 1];
         },
      InvokeExceptionHandler = 
         (id, methodName, returnType, parameters, exception) =>
         {
            //do your custom exception handler if your invoke throws
            return true; //return true if you want exception thrown 
            //return false if you want the exception buried
         }
   }
});

Here’s the default “invoke” code should you not wish to provide one.

(id, methodName, returnType, parameters) =>
   {
      Console.WriteLine(id + methodName);
      var retval = new object[parameters.Length + 1];
      
      //return params must have returnType 
      //as first element in the return values
      retval[0] = returnType.Name == "String" 
         ? returnType.ToString() 
         : TypeHelper.GetDefault(returnType);

      //by default, return all parameters as supplied
      for (int i = 0; i < parameters.Length; i++)
      {
         retval[i + 1] = parameters[i];
      }
      return retval;
   };

Of course, you might want to log the calls, aggregate counts per methodName or whatever you wish. I hope you find this useful, but I hope more that you will build your dynamic proxy wrapper for your own cool purposes.

ServiceWire and ServiceMq Improvements

Over the past several days, I’ve been working on a day-job project that may be using ServiceMq which uses ServiceWire under the covers. I say “may” because it depends on whether the prototype proves to be sufficiently reliable and efficient. The prototype is really more of an extended integration test with the following requirements:

  • Blocking Send method that throws if sending fails.
    • Tries the primary destination first.
    • Alternative destinations are tried successively until the message is successfully sent.
    • Control over send connection timeout failure to enable “fail fast.”
  • Standard caching receive.

This is because the senders are transient and may not be restarted should they fail. The sender’s also need immediate feedback because the action is part of a transaction involving other operations.

The first order of business was to add the Flash class to ServiceMq. This evolved to become the Flasher class which implements IDisposable in order to take advantage of client connection pooling using the updated PooledDictionary class in ServiceWire (more on this later). The Flasher’s Send method allows you to send a message to a primary destination with zero to many alternate destinations.

[TestMethod]
public void FlashDestDownTcpTest()
{
   var qfrom = new Address(Dns.GetHostName(), 8966);
   var q1Address = new Address(Dns.GetHostName(), 8967);
   var q2Address = new Address(Dns.GetHostName(), 8968);

   // create and use Flasher in using to guarantee Dispose call
   using (var flash = new Flasher(qfrom))
   {
      // create a receiving queue
      using (var q2 = new MessageQueue("qf2", q2Address, @"c:\temp\qf2"))
      {
	     // send to primary which does not exist - received on secondary q2
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q2.Receive();
         Assert.IsTrue(msg.Id == id);
      }

      using (var q1 = new MessageQueue("qf1", q1Address, @"c:\temp\qf1"))
      {
	     // send to primary - received on primary
         var id = flash.Send(q1Address, "my test message", q2Address);
         var msg = q1.Receive();
         Assert.IsTrue(msg.Id == id);
      }

	  // demonstrate Send throws when neither receiver is "up"
      try
      {
         var id = flash.Send(q1Address, "my test message", q2Address);
      }
      catch (Exception e)
      {
         Assert.IsTrue(e is System.Net.WebException);
      }
   }
}

In order to support a more robust client side connection timeout, a significant improvement was made to ServiceWire. The TcpEndPoint class was introduced and an overloaded constructor was added to TcpChannel which allows you to specify a connection timeout value when creating an instance of TcpClient<T> (see below). This involved use of the Socket class’s ConnectAsync method with a SockeAsyncEventArgs object.

private void Initialize(Type serviceType, 
   IPEndPoint endpoint, int connectTimeoutMs)
{
   _serviceType = serviceType;
   _client = new Socket(AddressFamily.InterNetwork, 
                        SocketType.Stream, ProtocolType.Tcp);
   _client.LingerState.Enabled = false;

   var connected = false;
   var connectEventArgs = new SocketAsyncEventArgs
   {
      // must designate the server you want to connect to
      RemoteEndPoint = endpoint
   };
   connectEventArgs.Completed 
      += new EventHandler<SocketAsyncEventArgs>((sender, e) =>
         {
            connected = true;
         });

   if (_client.ConnectAsync(connectEventArgs))
   {
      //operation pending - (false means completed synchronously)
      while (!connected)
      {
         if (!SpinWait.SpinUntil(() => connected, connectTimeoutMs))
         {
            if (null != _client) _client.Dispose();
            throw new TimeoutException("Unable to connect within " 
                                       + connectTimeoutMs + "ms");
         }
      }
   }
   if (connectEventArgs.SocketError != SocketError.Success)
   {
      if (null != _client) _client.Dispose();
      throw new SocketException((int)connectEventArgs.SocketError);
   }

   if (!_client.Connected) throw new SocketException(); 
   _stream = new BufferedStream(new NetworkStream(_client), 8192);
   _binReader = new BinaryReader(_stream);
   _binWriter = new BinaryWriter(_stream);
   SyncInterface(_serviceType);
}

The final problem to solve was TCP/IP port exhaustion. My original implementation had the sending client being created and taken down with each call to the Send method. The construction overhead is minimal but the connect time and the port exhaustion problem quickly becomes a problem where there are potentially many threads sending messages from the same client machine.

To solve the problem, I used a connection pooling strategy that involved making the PooledDictionary<TKey, TValue> implement the IDisposable interface to allow for easy disposal of pooled client objects. The Send method then uses one of two client pools, depending on whether it is a local Named Pipes connection of a TCP connection.

private void SendMessage(OutboundMessage message)
{
   NpClient<IMessageService> npClient = null;
   TcpClient<IMessageService> tcpClient = null;
   IMessageService proxy = null;
   var poolKey = message.To.ToString();
   try
   {
      // determine whether to use NamedPipes or Tcp
      var useNpClient = false;
      if (message.To.Transport == Transport.Both)
      {
         if (message.To.ServerName == message.From.ServerName)
         {
            useNpClient = true;
         }
      }
      else if (message.To.Transport == Transport.Np) useNpClient = true;

	  // requet a client from the pool, providing a way to create a new one
      if (useNpClient)
      {
         npClient = npClientPool.Request(poolKey, 
            () => new NpClient<IMessageService>(
               new NpEndPoint(message.To.PipeName, _connectTimeOutMs)));
         proxy = npClient.Proxy;
      }
      else
      {
         tcpClient = tcpClientPool.Request(poolKey,
            () => new TcpClient<IMessageService>(new TcpEndPoint(
               new IPEndPoint(IPAddress.Parse(message.To.IpAddress), 
                  message.To.Port), _connectTimeOutMs)));
         proxy = tcpClient.Proxy;
      }

	  // send the message via the proxy RPC Enqueue* method
      if (null == message.MessageBytes)
      {
         proxy.EnqueueString(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageString);
      }
      else
      {
         proxy.EnqueueBytes(message.Id, message.From.ToString(), 
		    message.Sent, message.SendAttempts,
            message.MessageTypeName, message.MessageBytes);
      }
   }
   finally
   {
      if (null != tcpClient) tcpClientPool.Release(poolKey, tcpClient);
      if (null != npClient) npClientPool.Release(poolKey, npClient);
   }
}

It remains to be seen whether these changes will result in a sufficiently robust message passing and queuing system to allow it to be used on my day job project. More testing and prototyping is required. There are alternatives to which I can fall back, but none of them are trivial and all of them are less desirable. Given my personal bias, I must take extra care to scrutinize this possible solution and abandon it should it prove to be insufficient.