tsJensen

A quest for software excellence...

Distributed Parallel Processing in Simple .NET Console Application

You write a simple console application. Debug it locally. And flip a switch and run it on all your servers and workstations running MpiVisor Server. This demo is part of the original source on GitHub. It is has a small Main class, a "master" runner class and a "spawned" runner class. More on how it works in the next post. For now, just enjoy how easy the code is. Read on to see the code.

static void Main(string[] args)
{
    //connect agent and dispose at end of execution
    //use forceLocal to run in a single process with internal visor
    using (Agent.Connect(forceLocal: false)) 
    {
        //default is File only - spawned agents shuttle logs back to master
        Log.LogType = LogType.Both; 
        if (Agent.Current.IsMaster)
        {
            try
            {
                //keep Main clean with master message loop class
                MasterRunner.Run(args);
            }
            catch (Exception e)
            {
                Log.Error("Agent master exception: {0}", e);
            }
        }
        else
        {
            try
            {
                //keep Main clean with spawned agent message loop class
                SpawnRunner.Run(args);
            }
            catch (Exception e)
            {
                Log.Error("spawn agent exception: {0}", e);
                Agent.Current.Send(new Message(Agent.Current.SessionId, 
                    Agent.Current.AgentId,
                    MpiConsts.MasterAgentId, 
                    SystemMessageTypes.Aborted, 
                    e.ToString()));
            }
        }
    }
} //standard ending - will force service to kill spawned agents  


internal static class MasterRunner
{
    private static ushort numberOfAgentsToSpawn = 2;

    //use to stop message processing loop
    private static bool continueProcessing = true;

    //additional means of determining when to stop processing loop
    private static ushort spawnedAgentsThatHaveStoppedRunning = 0;

    public static void Run(string[] args)
    {
        //spawn worker agents, send messages and orchestrate work
        Agent.Current.SpawnAgents(numberOfAgentsToSpawn, args);
        Message msg;
        do
        {
            msg = Agent.Current.ReceiveAnyMessage();
            if (msg == null) continue; //we timed out
            switch (msg.MessageType)
            {
                //handle content types > -1 which are application specific
                //case 0-~:
                    //handle messages from master or other agents here
                    //break;
                case 2:
                    //handle messages from master or other agents here
                    Log.Info("AgentId {0} sent message type 2 with {1}", 
                        msg.FromId, msg.Content);

                    //this test/demo just sends the message back to the sender
                    Agent.Current.Send(new Message
                        {
                            FromId = Agent.Current.AgentId,
                            SessionId = Agent.Current.SessionId,
                            ToId = msg.FromId,
                            MessageType = SystemMessageTypes.Shutdown,
                            Content = null
                        });
                    break;

                //handle internal messages and unknown
                case SystemMessageTypes.Started:
                    Log.Info("AgentId {0} reports being started.", msg.FromId);
                    //send demo/test content message
                    Agent.Current.Send(new Message
                    {
                        FromId = Agent.Current.AgentId,
                        SessionId = Agent.Current.SessionId,
                        ToId = msg.FromId,
                        MessageType = 1,  //custom app type
                        Content = "hello from 1"
                    });
                    break;
                case SystemMessageTypes.Stopped:
                    Log.Info("AgentId {0} reports being stopped.", msg.FromId);
                    spawnedAgentsThatHaveStoppedRunning++;
                    break;
                case SystemMessageTypes.Aborted:
                    Log.Info("AgentId {0} reports being aborted.", msg.FromId);
                    spawnedAgentsThatHaveStoppedRunning++;
                    break;
                case SystemMessageTypes.Error:
                    Log.Info("AgentId {0} reports an error.", msg.FromId);
                    break;
                default:
                    Log.Info("AgentId {0} sent {1} with {2}", 
                        msg.FromId, msg.MessageType, msg.Content);
                    break;
            }
        }
        while (continueProcessing 
            && spawnedAgentsThatHaveStoppedRunning < numberOfAgentsToSpawn);
        
        //change while logic as desired to keep master running 
        //or shut it down and all other agents will as well
        Log.Info("done master");
    }
	
	
internal static class SpawnRunner	
{	
    private static bool continueProcessing = true;	

    public static void Run(string[] args)
    {
        Message msg;
        do
        {
            msg = Agent.Current.ReceiveAnyMessage();
            if (msg == null) continue; //we timed out
            switch (msg.MessageType)
            {
                //handle content types > -1 which are application specific
                case 1:
                    //handle messages from master or other agents here
                    Log.Info("AgentId {0} sent message type 1 with {1}", 
                        msg.FromId, msg.Content);

                    //this test/demo just sends the message back to the sender
                    Agent.Current.Send(new Message
                        {
                            FromId = Agent.Current.AgentId,
                            SessionId = Agent.Current.SessionId,
                            ToId = msg.FromId,
                            MessageType = 2,
                            Content = msg.Content.ToString() + " received"
                        });
                    break;

                //handle internal messages and unknown
                case SystemMessageTypes.Shutdown:
                    Log.Info("AgentId {0} sent shut down message", msg.FromId);
                    continueProcessing = false;
                    break;
                default:
                    Log.Info("AgentId {0} sent {1} with {2}", 
                        msg.FromId, msg.MessageType, msg.Content);
                    break;
            }
        }
        while (continueProcessing);
        Log.Info("work done");
    }
}