tsJensen

A quest for software excellence...

ServiceWire 1.5.0 Released

ServiceWire is a very fast and light weight service host and dynamic client library that simplifies the development and use of high performance remote procedure call (RPC) communication between .NET processes over Named Pipes or TCP/IP.

The DuoVia.Net library has progressed significantly. But everyone I work with who uses it balks at the name. So I’ve renamed it. And I like the name very much. I hope you do too. It is ServiceWire. This name more aptly describes intuitively what the library does. Hopefully this will help with adoption and participation.

I’ve laid out the documentation wiki on that site and will spend the next few days or weeks getting it completely fleshed out. The code is in a new repository and there is a new NuGet package which, with the exception of namespaces, is at perfect parity with DuoVia.Net version 1.5.0.

I’m very interested in getting your feedback on ServiceWire, the name and the library.

BufferedStream Improves .NET Sockets Performance

.NET’s BufferedStream saved the day, instantly reducing 200-400ms operations across the wire to 1ms. Why? Simple answer, the Socket class returns a NetworkStream. Wire up that stream to a StreamWriter and a StreamReader, and you’re good to go, right? Wrong. Turns out the StreamWriter and StreamReader have a default 16 byte read/write buffer. And NetworkStream has none.

So if you have a TCP socket wired up to a NetworkStream, you’re trying to send or receive just 16 bytes at a time, utterly killing performance over TCP. Now magically wrap that NetworkStream into a BufferedStream and pass that BufferedStream into your StreamReader and StreamWriter and you get instant performance gains that will knock your sockets off.

Backstory: For months I’ve been writing and improving my DuoVia.Net fast services library. And recently the my day job’s team began using it some very clever ways (sorry, NDA and all), but we ran into a major performance problem. While performance on the same machine across processes using Named Pipes was excellent, the same was not true of machine-to-machine communications over TCP/IP. Sub-millisecond calls between services were taking 200-400ms across the wire. Something was terribly wrong. And when we tried Named Pipes from server to server, the performance problem went away. Of course, this was not the final answer because a .NET Named Pipes host can only handle 254 concurrent connections and we need to be able to scale beyond that.

Solving the problem required several sleepless nights and a weekend searching for the answer. My tests for TCP/IP with respect to performance have always run locally on the localhost loopback stack. The trouble with that, I have since learned (and should have known), is that when running locally, the Windows TCP stack bypasses the TCP stack altogether, or nearly so—sufficiently at least to mask the underlying problem of reading and writing only 16 bytes at a time directly on the NetworkStream.

After examining a number of open source implementations of Sockets on a server host, I ran into one or two smart enough to be using the BufferedStream to wrap that NetworkStream that a raw Socket object gives you. While doing all of this research, I also ran into the MSDN explanation (see Remarks and Example section) of how to improve server side asynchronous Socket handling. So I threw that into the solution as well. Once wired up and tested across machines on my home network, I breathed a huge sigh of relief. And here is what the code looks like now. First server and then client.

using System;
using System.Net.Sockets;
using System.Net;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace DuoVia.Net.TcpIp
{
  public class TcpHost : Host
  {
    private Socket _listener;
    private IPEndPoint _endPoint;
    private ManualResetEvent _listenResetEvent = new ManualResetEvent(false);

    /// <summary>
    /// Constructs an instance of the host and starts listening for 
		/// incoming connections on any ip address.
    /// All listener threads are regular background threads.
    /// </summary>
    /// <param name="port">The port number for incoming requests</param>
    /// <param name="log"></param>
    /// <param name="stats"></param>
    public TcpHost(int port, ILog log = null, IStats stats = null)
    {
      Initialize(new IPEndPoint(IPAddress.Any, port), log, stats);
    }

    /// <summary>
    /// Constructs an instance of the host and starts listening for incoming 
		/// connections on designated endpoint.
    /// All listener threads are regular background threads.
    /// 
    /// NOTE: the instance created from the specified type 
		/// is not automatically thread safe!
    /// </summary>
    /// <param name="endpoint"></param>
    /// <param name="log"></param>
    /// <param name="stats"></param>
    public TcpHost(IPEndPoint endpoint, ILog log = null, IStats stats = null)
    {
      Initialize(endpoint, log, stats);
    }

    private void Initialize(IPEndPoint endpoint, ILog log, IStats stats)
    {
      base.Log = log;
      base.Stats = stats;
      _endPoint = endpoint;
      _listener = new Socket(AddressFamily.InterNetwork, 
			  SocketType.Stream, ProtocolType.Tcp);
      _listener.SetSocketOption(SocketOptionLevel.Socket, 
			  SocketOptionName.KeepAlive, true);
      _listener.SetSocketOption(SocketOptionLevel.Socket, 
			  SocketOptionName.DontLinger, true);
    }

    /// <summary>
    /// Gets the end point this host is listening on
    /// </summary>
    public IPEndPoint EndPoint
    {
      get { return _endPoint; }
    }

    protected override void StartListener()
    {
      Task.Factory.StartNew(Listen, TaskCreationOptions.LongRunning);
    }

    private SocketAsyncEventArgs _acceptEventArg;

    /// <summary>
    /// Listens for incoming tcp requests.
    /// </summary>
    private void Listen()
    {
      try
      {
        _listener.Bind(_endPoint);
        _listener.Listen(8192);

        _acceptEventArg = new SocketAsyncEventArgs();
        _acceptEventArg.Completed 
				  += new EventHandler<SocketAsyncEventArgs>
					   (acceptEventArg_Completed);

        while (!_disposed)
        {
          // Set the event to nonsignaled state.
          _listenResetEvent.Reset();
          _acceptEventArg.AcceptSocket = null;
          try
          {
            if (!_listener.AcceptAsync(_acceptEventArg))
            {
              AcceptNewClient(_acceptEventArg);
            }
          }
          catch (Exception ex)
          {
            _log.Error("Listen error: {0}", 
						  ex.ToString().Flatten());
            break; //break loop on unhandled
          }

          // Wait until a connection is made before continuing.
          _listenResetEvent.WaitOne();
        }
      }
      catch (Exception e)
      {
        _log.Fatal("Listen fatal error: {0}", e.ToString().Flatten());
      }
    }

    private void acceptEventArg_Completed(object sender, 
		  SocketAsyncEventArgs e)
    {
      AcceptNewClient(e);
    }

    private void AcceptNewClient(SocketAsyncEventArgs e)
    {
      try
      {
        if (e.SocketError != SocketError.Success)
        {
          if (!_disposed) _listenResetEvent.Set();
          return;
        }

        Socket activeSocket = null;
        BufferedStream stream = null;
        try
        {
          activeSocket = e.AcceptSocket;

          // Signal the listening thread to continue.
          _listenResetEvent.Set();

          stream = new BufferedStream
					  (new NetworkStream(activeSocket), 8192);
          base.ProcessRequest(stream);
        }
        catch (Exception ex)
        {
          _log.Error("AcceptNewClient_ProcessRequest error: {0}", 
					  ex.ToString().Flatten());
        }
        finally
        {
          if (null != stream)
          {
            stream.Close();
          }
          if (null != activeSocket && activeSocket.Connected)
          {
            try
            {
              activeSocket.Shutdown(SocketShutdown.Both);
            }
            catch (Exception shutdownException)
            {
              _log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", 
							  shutdownException.ToString().Flatten());
            }

            try
            {
              activeSocket.Close();
            }
            catch (Exception closeException)
            {
              _log.Error("AcceptNewClient_ActiveSocketClose error: {0}", 
							  closeException.ToString().Flatten());
            }
          }
        }
      }
      catch (Exception fatalException)
      {
        _log.Fatal("AcceptNewClient fatal error: {0}", 
				  fatalException.ToString().Flatten());
      }
    }

    #region IDisposable Members

    private bool _disposed = false;

    protected override void Dispose(bool disposing)
    {
      if (!_disposed)
      {
        _disposed = true; //prevent second call to Dispose
        if (disposing)
        {
          _listenResetEvent.Set();
          _acceptEventArg.Dispose();
          _listener.Close();
          _listenResetEvent.Close();
        }
      }
      base.Dispose(disposing);
    }

    #endregion
  }
}

Client code:

using System;
using System.Net;
using System.Net.Sockets;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;

namespace DuoVia.Net.TcpIp
{
  public class TcpChannel : StreamingChannel
  {
    private Socket _client;

    /// <summary>
    /// Creates a connection to the concrete object handling 
    /// method calls on the server side
    /// </summary>
    /// <param name="serviceType"></param>
    /// <param name="endpoint"></param>
    public TcpChannel(Type serviceType, IPEndPoint endpoint)
    {
      _serviceType = serviceType;
      _client = new Socket(AddressFamily.InterNetwork, 
        SocketType.Stream, ProtocolType.Tcp);
      _client.LingerState.Enabled = false;
      _client.Connect(endpoint);
      if (!_client.Connected) throw new SocketException(); 
      _stream = new BufferedStream(new NetworkStream(_client), 8192);
      _binReader = new BinaryReader(_stream);
      _binWriter = new BinaryWriter(_stream);
      _formatter = new BinaryFormatter();
      SyncInterface(_serviceType);
    }

    public override bool IsConnected 
    { 
      get 
      { 
        return (null != _client) && _client.Connected; 
      } 
    }

    #region IDisposable override

    protected override void Dispose(bool disposing)
    {
      base.Dispose(disposing);
      if (disposing)
      {
        _binReader.Close();
        _binWriter.Close();
        _client.Close();
      }
    }

    #endregion
  }
}

You can find all of the ServiceWire code on GitHub or install the package from NuGet.

My Technical 2013

Technically speaking, I had a fun and productive 2013. Here are some highlights worth mentioning.

StorageClient: Client Side Load Balancing

A technology specific problem solved, bypassing server based solution with client side load balancing and fast fail retry algorithms that took us from horrible to nearly 5 nines in reliability while improving overall performance. (This was at the day job, so that's about as much as I can share about that.)

LocalCache: In Memory Cache with Async Persistence

A library that takes advantage of Concurrent Collections in .NET and SQLite to provide fast in-memory caching that persists asynchronously on local disk for rapid rehydration of in-memory cache when an application pool is recycled. This solved a big problem with service level compliance recovery on a critical service, taking complete recovery time from hours to a few minutes. (Also for the day job.)

DuoVia.Net: TCP and NamedPipes Services Library

An extension and revival of RemotingLite that makes intra-process communication easy and fast. This was my first foray into creating and sharing open source software on GitHub and publishing packages on NuGet. I enjoyed it so much, I added 8 more packages to the set. And while these projects were built on my own time, one or two of them are in regular use by one or two teams at the current day job and they have been downloaded over 1,500 times.

VersionedCollections: A Shared Idea Brought to Life

Recently I shared an idea with Ayende Rahien on his blog with respect to creating a snapshot-in-time, read-only view of a collection that is being written to constantly. I'm happy to report that it turned out to be exactly what he needed. And I am honored and appreciative to Ayende for the kudos. Sharing good ideas with community friends is almost as much fun as bringing them to life yourself.

Here's to an equally fun and productive 2014.

PooledDictionary<TKey, TValue> - A Thread Safe Object Pool by Key

Work on my DuoVia.Net and DuoVia.MpiVisor projects has been progressing well. I now have an opportunity to use the libraries in a significant use case at Ancestry.com, my day job, which has been very useful in finding ways to improve the libraries. (Disclaimer: Ancestry.com does not endorse the DuoVia library and I am not a spokesman for Ancestry.)

In its first incarnation, the ProxyFactory created a new dynamic assembly each time it created a proxy. This was expensive in terms of creation time but more so in terms of memory once many thousands of proxies had been created. Assemblies are kept in memory for the life of the process.

First I tried a Dictionary of ProxyBuilder objects, the container class for the objects needed to create an instance of the dynamically generated assembly’s proxy type that implements the target interface. I used a lock on this Dictionary but of course that created a bottleneck and a many threaded application trying to create many proxy connections would run into that bottleneck.

Next I tried a ThreadStatic instance of that Dictionary, keeping a ProxyBuilder for each key type. This eliminated the bottle neck but necessitated the creation of many more ProxyBuilder objects than was necessary and no guarantee could be made that these objects would ever be utilized more than once. In a multithreaded client using thread pool threads or its own threads, over time, memory usage and performance would be negatively impacted.

Pooling was to the answer. But how can you pool objects of the same base type by key rather than type? There are many object pool examples to be found but all those that I found were based on type alone. Time to roll my own. And PooledDictionary<TKey, TValue> is what I came up with.

First the code that uses it so you can get a feel for how easy it is to use. Note that the Request method’s Func<TValue> parameter called CreateProxyBuilder. The CreateProxyBuilder is the costly and complex method that creates the dynamic assembly and collects the necessary objects into the ProxyBuilder object that will be required to create an instance of the proxy for the target interface. The function is used to create a new ProxyBuilder if the pool is depleted.

Using the PooledDictionary<TKey, TValue>

private static PooledDictionary<string, ProxyBuilder> _proxies = 
  new PooledDictionary<string, ProxyBuilder>();

public static TInterface CreateProxy<TInterface>(Type channelType, 
  Type ctorArgType, object channelCtorValue) where TInterface : class
{
  if (!channelType.InheritsFrom(typeof(Channel))) 
  {
    throw new ArgumentException("channelType does not inherit from Channel");
  }
  Type interfaceType = typeof(TInterface);
  var proxyName = interfaceType.FullName + channelType.FullName + ctorArgType.FullName;

  //get pooled proxy builder
  var localChannelType = channelType;
  var localCtorArgType = ctorArgType;
  ProxyBuilder proxyBuilder = _proxies.Request(proxyName, () => 
    CreateProxyBuilder(proxyName, interfaceType, localChannelType, localCtorArgType));

  //create proxy
  var proxy = CreateProxy<TInterface>(proxyBuilder, channelCtorValue);

  //return builder to the pool
  _proxies.Release(proxyName, proxyBuilder);

  return proxy;
}

And now the code that makes the magic happen. Note the use of the System.Collections.Concurrent namespace. While sometimes heavy, these collections really do have their place on the parallel programmer’s palette.

PooledDictionary<TKey, TValue>

public class PooledDictionary<TKey, TValue> 
{
  private readonly ConcurrentDictionary<TKey, ConcurrentQueue<TValue>> _dq;
  private readonly int _concurrencyLevel;
  private readonly int _size;

  public PooledDictionary()
  {
    _concurrencyLevel = Environment.ProcessorCount * 8;
    _size = _concurrencyLevel * _concurrencyLevel;
    _dq = new ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>(_concurrencyLevel, _size);
  }

  public void Add(TKey key, TValue value)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      q.Enqueue(value);
    }
    else
    {
      throw new ArgumentException("Unable to add value");
    }
  }

  public int Count(TKey key)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      return q.Count;
    }
    return 0;
  }

  public TValue Request(TKey key, Func<TValue> creator = null)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      TValue v;
      if (q.TryDequeue(out v)) return v;
      if (null != creator) return creator();
    }
    return default(TValue);
  }

  public void Release(TKey key, TValue value)
  {
    Add(key, value); //just adds it back to key's queue
  }
}

I hope this little gem is as useful to you as it has been to me. In tests and production, I have found it to be as fast or faster than the ThreadStatic collection approach. And memory consumption in production has returned to satisfactory levels because we are now creating only the number of ProxyBuilders that we need and using those efficiently.

Distributed Parallel Processing in Simple .NET Console Application

You write a simple console application. Debug it locally. And flip a switch and run it on all your servers and workstations running MpiVisor Server. This demo is part of the original source on GitHub. It is has a small Main class, a "master" runner class and a "spawned" runner class. More on how it works in the next post. For now, just enjoy how easy the code is. Read on to see the code.

static void Main(string[] args)
{
    //connect agent and dispose at end of execution
    //use forceLocal to run in a single process with internal visor
    using (Agent.Connect(forceLocal: false)) 
    {
        //default is File only - spawned agents shuttle logs back to master
        Log.LogType = LogType.Both; 
        if (Agent.Current.IsMaster)
        {
            try
            {
                //keep Main clean with master message loop class
                MasterRunner.Run(args);
            }
            catch (Exception e)
            {
                Log.Error("Agent master exception: {0}", e);
            }
        }
        else
        {
            try
            {
                //keep Main clean with spawned agent message loop class
                SpawnRunner.Run(args);
            }
            catch (Exception e)
            {
                Log.Error("spawn agent exception: {0}", e);
                Agent.Current.Send(new Message(Agent.Current.SessionId, 
                    Agent.Current.AgentId,
                    MpiConsts.MasterAgentId, 
                    SystemMessageTypes.Aborted, 
                    e.ToString()));
            }
        }
    }
} //standard ending - will force service to kill spawned agents  


internal static class MasterRunner
{
    private static ushort numberOfAgentsToSpawn = 2;

    //use to stop message processing loop
    private static bool continueProcessing = true;

    //additional means of determining when to stop processing loop
    private static ushort spawnedAgentsThatHaveStoppedRunning = 0;

    public static void Run(string[] args)
    {
        //spawn worker agents, send messages and orchestrate work
        Agent.Current.SpawnAgents(numberOfAgentsToSpawn, args);
        Message msg;
        do
        {
            msg = Agent.Current.ReceiveAnyMessage();
            if (msg == null) continue; //we timed out
            switch (msg.MessageType)
            {
                //handle content types > -1 which are application specific
                //case 0-~:
                    //handle messages from master or other agents here
                    //break;
                case 2:
                    //handle messages from master or other agents here
                    Log.Info("AgentId {0} sent message type 2 with {1}", 
                        msg.FromId, msg.Content);

                    //this test/demo just sends the message back to the sender
                    Agent.Current.Send(new Message
                        {
                            FromId = Agent.Current.AgentId,
                            SessionId = Agent.Current.SessionId,
                            ToId = msg.FromId,
                            MessageType = SystemMessageTypes.Shutdown,
                            Content = null
                        });
                    break;

                //handle internal messages and unknown
                case SystemMessageTypes.Started:
                    Log.Info("AgentId {0} reports being started.", msg.FromId);
                    //send demo/test content message
                    Agent.Current.Send(new Message
                    {
                        FromId = Agent.Current.AgentId,
                        SessionId = Agent.Current.SessionId,
                        ToId = msg.FromId,
                        MessageType = 1,  //custom app type
                        Content = "hello from 1"
                    });
                    break;
                case SystemMessageTypes.Stopped:
                    Log.Info("AgentId {0} reports being stopped.", msg.FromId);
                    spawnedAgentsThatHaveStoppedRunning++;
                    break;
                case SystemMessageTypes.Aborted:
                    Log.Info("AgentId {0} reports being aborted.", msg.FromId);
                    spawnedAgentsThatHaveStoppedRunning++;
                    break;
                case SystemMessageTypes.Error:
                    Log.Info("AgentId {0} reports an error.", msg.FromId);
                    break;
                default:
                    Log.Info("AgentId {0} sent {1} with {2}", 
                        msg.FromId, msg.MessageType, msg.Content);
                    break;
            }
        }
        while (continueProcessing 
            && spawnedAgentsThatHaveStoppedRunning < numberOfAgentsToSpawn);
        
        //change while logic as desired to keep master running 
        //or shut it down and all other agents will as well
        Log.Info("done master");
    }
	
	
internal static class SpawnRunner	
{	
    private static bool continueProcessing = true;	

    public static void Run(string[] args)
    {
        Message msg;
        do
        {
            msg = Agent.Current.ReceiveAnyMessage();
            if (msg == null) continue; //we timed out
            switch (msg.MessageType)
            {
                //handle content types > -1 which are application specific
                case 1:
                    //handle messages from master or other agents here
                    Log.Info("AgentId {0} sent message type 1 with {1}", 
                        msg.FromId, msg.Content);

                    //this test/demo just sends the message back to the sender
                    Agent.Current.Send(new Message
                        {
                            FromId = Agent.Current.AgentId,
                            SessionId = Agent.Current.SessionId,
                            ToId = msg.FromId,
                            MessageType = 2,
                            Content = msg.Content.ToString() + " received"
                        });
                    break;

                //handle internal messages and unknown
                case SystemMessageTypes.Shutdown:
                    Log.Info("AgentId {0} sent shut down message", msg.FromId);
                    continueProcessing = false;
                    break;
                default:
                    Log.Info("AgentId {0} sent {1} with {2}", 
                        msg.FromId, msg.MessageType, msg.Content);
                    break;
            }
        }
        while (continueProcessing);
        Log.Info("work done");
    }
}

Distributed Parallel Computing for .NET with DuoVia.MpiVisor

I have not posted on my blog for some time. I’ve been busy. Work. Family. And DuoVia.Net. DuoVia Inc. has been my little corp-to-corp contracting vehicle which largely lays dormant while I’m employed in any other arrangement. I have no plans to change my current employment arrangement but I am transforming DuoVia into an open source services and tools for .NET organization.

It has taken a large number of weekends and evenings to pull it off, but I’m am now very excited to present DuoVia’s first three open source projects, new web site, and new vision. The site is nothing really to brag about. It is simple and clean. The vision is evolving. But the three very cool projects are something else.

DuoVia.MpiVisor
Don’t let anybody lie to you. Doing distributed parallel computing (DPC) on HPC or Hadoop or MPI can be very hard. Very complex. And for a advanced DPC systems that will not likely change.

But let’s face it. Many of the chores we need a distributed computing system for are small, down and dirty, one-off projects where something hard needs to be done once or perhaps occasionally. But the problem just doesn’t merit the hard work or the budget required to use an enterprise class system. So often as not the project just doesn’t get done or you solve the problem in a far less efficient and satisfying way.

You can get the “agent” library as a NuGet package. This will give you everything you need to write and debug your DPC application in Visual Studio without having to deploy it to your distributed processing servers or workstations. Once you’re ready to rock and roll, just get the code for the server at GitHub and run it on as many machines and you need.

That’s where DuoVia.MpiVisor comes in. Read more on the new web site www.duovia.net.

DuoVia.Net
Did you ever need to just write a simple interface, a couple of DTOs and a simple implementation of service, stand it up and use it without any configuration fuss? Don’t want to bother configuring WCF or spinning up a RESTful interface just to have to .NET apps exchange some data fast and easy? Look no further. Get the source code at GitHub or download the NuGet package today.

DuoVia.FuzzyStrings
Do you need to know if “Jensn” is probably equal to “Jensen”? Then the DuoVia.FuzzyStrings library is the perfect choice. You can download the source at GitHub or get the NuGet package.

Demo and Test Code on GitHub
If you want to learn how to use these libraries before I have time to fully document them and publish that on the site, please do go to GitHub and download the source. In each project you will find the source for a demo or test which shows just how easy these libraries are to use.

Apologies for the sales pitch, but I really do want you to visit the new web site and check it all out.

Of course I will continue to blog here about various and random .NET development topics. But I invite you to check out the new web site and more especially to give the new projects a try. You may find them useful and when you do, I hope to hear from you about your triumphs.