Critical Development

Language design, framework development, UI design, robotics and more.

Concurrency & Coordination With Futures in C#

Posted by Dan Vanderboom on July 3, 2008

A future is a proxy or placeholder for a value that may not yet be known, usually because the calculation is time consuming.  It is used as a synchronization construct, and it is an effective way to define dependencies among computations that will execute when all of their factors have been calculated, or in other words, to construct an expression tree with each node potentially computing in parallel.  According to the Wikipedia article on futures and promises, using them can dramatically reduce latency in distributed systems.

Damon pointed out that the Parallel Extensions library contains a Future<T> class, so I started looking around for examples and explanations of how they work, what the syntax is like, and I ran across a frightening example implementing the asynchronous programming model with Future<T>, as well as going in the other direction, wrapping an APM implementation with Future<T>.  Other articles give pretty good explanations but trivial examples.  From what I gathered briefly, the ContinueWith method for specifying the next step of calculation to process doesn’t seem to provide an intuitive way to indicate that several calculations may be depending upon the current one (unless it can be called multiple times?).  Using ContinueWith, you’re always specifying forward the calculation task that depends on the current future object.  It also surprised me a little that Future inherits from Task, because my understanding of a future is that it’s primarily defined as a value-holding object.  But considering that a future really holds an expression that needs to be calculated, making Future a Task doesn’t seem so odd.

So I decided to implement my own Future<T> class before looking at the parallel extensions library too deeply.  I didn’t want to prejudice my solution, because I wanted to make an exercise of it and see what I would naturally come up with.

Though I tried avoiding prejudice, I still wound up characterizing it in my head as an task, and thought that a future would simply be a pair of Action and Reaction methods (both of the Action delegate type).  The Action would execute and could do whatever it liked, including evaluate some expression and store it in a variable.  If the Action completed, the Reaction method (a continuation) would run, and these could be specified using lambdas.  Because I was storing the results in a local variable (result), swallowed up and made accessible with a closure, I didn’t see a need for a Value property in the future and therefore no need to make the type generic.  Ultimately I thought it silly to have a Reaction method, since anything you needed to happen sequentially after a successful Action, you could simply store at the end of the Action method itself.

FutureTask task = new FutureTask(
    () => result = CalculatePi(10),
    () => new FutureTask(
        () => result += "...",
        () => Console.WriteLine(result),
        ex2 => Console.WriteLine("ex2: " + ex2.Message)),
    ex1 => Console.WriteLine("ex1: " + ex1.Message));

The syntax is what I was most concerned with, and as I started playing around with nesting of futures to compose my calculation, I started to feel like I was onto something.  After all, it was almost starting to resemble some of the F# code I’ve been looking at, and I took that style of functional composition to be a good sign.  As you can see from the code above, I also include a constructor parameter of type Action<Exception> for compensation logic to run in the event that the original action fails.  (The result variable is a string, CalculatePi returns a string, and so the concatenation of the ellipsis really does make sense here.)

The problem that started nagging me was the thought that a composite computation of future objects might not be able to be defined all in one statement like this, not building the dependency tree from the bottom up.  You can really only define the most basic factors (the leaf nodes of a dependency tree) at the beginning this way, and then the expressions that depend directly upon those leaf nodes, etc.  What if you have 50 different starting future values, and you can only proceed with the next step in the calculation once 5 of those specific futures have completed evaluation?  How would you express those dependencies with this approach?

That’s when I started to think about futures as top-down hierarchical data container objects, instead of tasks that have pointers to some next task in a sequence.  I created a Future<T> class whose constructor takes an optional name (to aid debugging), a method of type Func<T> (which is a simple expression, supplied as a lambda in my examples), and finally an optional params list of other Future<T> objects on which that future value depends.

The first two futures in the code below start calculating pi (3.1415926535) and omega (which I made up to be a string of 9s).  They have no dependencies, so they can start calculating right away.  The paren future has two dependencies, supplied as two parameters at the end of the argument list: pi and omega.  You can see that the values pi.Value and omega.Value are used in the expression, which will simply surround the concatenated string value with parentheses and spaces.

var pi = new Future<string>("pi", () => CalculatePi(10));
var omega = new Future<string>("omega", () => CalculateOmega());

var paren = new Future<string>("parenthesize", () => Parenthesize(pi.Value + " < " + omega.Value), pi, omega);

var result = new Future<string>("bracket", () => Bracket(paren.Value), paren);

Finally, the result future has a dependency on the paren future.  This surrounds the result of paren.Value with brackets and spaces.  Because the operations here are trivial, I’ve added Thread.Sleep statements to all of these methods to simulate more computationally expensive work.

Dependencies Among Futures

The program starts calculating pi and omega concurrently, and then immediately builds the paren future, which because of its dependencies waits for completion of the pi and omega futures.  But it doesn’t block the thread.  Execution continues immediately to build the result future, and then moves on to the next part of the program.  When each part of the expression, each future, completes, it will set a Complete boolean property to true and invoke a Completed event.  Any attempt to access the Value property of one of these futures will block the thread until it (and all of the futures it depends on) have completed evaluation.

Furthermore, if an exception occurs, all of the futures that depend on it will no longer attempt to evaluate, and the exceptions will be thrown as part of an AggregateException when accessing the Value property.  This AggregateException contains all of the individual exceptions that were thrown as part of evaluating each future expression.  If both pi and omega fail, result should be able to hand me a list of all Exceptions below it in the tree structure that automatically gets formed.

There are two bits of code I added as icing on this cake.  The first is the use of the implicit operator to convert a variable of type Future<T> to type T.  In other words, if you have a Future<string> called result, you can now pass result into methods where a string parameter is expected, etc.  In the code listing at the end of the article, you’ll notice that I reference pi and omega instead of pi.Value and omega.Value (as in the code snippet above).

public static implicit operator T(Future<T> Future)
{
    return Future.Value;
}

The other helpful bit is an override of ToString, which allows you to hover over a future variable in debug mode and see its name (if you named it), whether it’s Complete or Incomplete, and any errors encountered during evaluation.

public override string ToString()
{
    return Name + ", " + (Complete ? "Complete" : "Incomplete") + (Error != null ? "Error=" + Error.Message : string.Empty);
}

Debug Experience of Future

What I’d really like to do is have the ability to construct this composite expression in a hierarchical form in the language, with a functionally composed syntax, replacing any parameter T with a Future<T>, something like this:

var result = new Future<string>("bracket", () => Bracket(
    new Future<string>("parenthesize", () => Parenthesize(
        new Future<string>("pi", () => CalculatePi(10))
        + " < " +
        new Future<string>("omega", () => CalculateOmega())
    ))
));

The Bracket and Parenthesize methods both require a string, but I give them an object that will at some point (“in the future”) evaluate to a string.  Another term used for future is promise, although there is a distinction in some languages that support both, but you can think in terms of giving those methods the promise that they’ll get a string later, at which time they can proceed with their own evaluation.  This effectively creates lazy evaluation, sometimes referred to as normal-order evaluation.

There are a few problems with this code, however.  First of all, though it’s composed functionally from the top down and returns the correct answer, it takes too long to do it: about 8 seconds instead of 4.  That means it’s processing all of the steps sequentially.  This happens because the future objects we’re handing to the Parenthesize and Bracket methods have to be converted from Future<string> to string before they can be evaluated in the expression, and doing that activates the implicit operator, which executes the Value property getter.  This completely destroys the asynchronous behavior we’re going for, by insisting on resolving it immediately with the wait built-into the Value property.  The string concatenation expression evaluates sequentially one piece at a time, and when that’s done, the next level up evaluates, and so on.

The solution is to declare our futures as factors we depend on at each level, which start them executing right away due to C#’s order of evaluation, and declare the operations we want to perform in terms of those predefined futures.  After a few hours of rearranging definitions, declaration order, and experimenting with many other details (including a brief foray into being more indirect with Func<Future<T>>), this is the working code I came up with:

Future<string> FuturePi = null, FutureOmega = null, FutureConcat = null, FutureParen = null;

var result = new Future<string>(
    () => Bracket(FutureParen),
    (FutureParen = new Future<string>(
        () => Parenthesize(FutureConcat),
        (FutureConcat = new Future<String>(
            () => FuturePi + " < " + FutureOmega,
            (FuturePi = new Future<string>(() => CalculatePi(10))),
            (FutureOmega = new Future<string>(() => CalculateOmega()))
        ))
    ))
);

In F# and other more functional languages, I imagine we could use let statements to define and assign these variables as part of the overall expression, instead of having to define the variables in a separate statement as shown here.

The Future<T> class I wrote works fairly well for exploration and study of futures and the possible syntax to define them and access their values, and I’ll share it so that you can experiment with it if you like, but understand that this is (even more so than usual) not production ready code.  I’m making some very naive assumptions, not taking advantage of any task managers or thread pools, there is no intelligent scheduling going on, and I haven’t tested this in any real world applications.  With that disclaimer out of the way, here it is, complete with the consuming test code.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;

namespace FutureExpressionExample
{
    class Program
    {
        static void Main(string[] args)
        {
            DateTime StartTime = DateTime.Now;

            Future<string> FuturePi = null, FutureOmega = null, FutureConcat = null, FutureParen = null;

            var result = new Future<string>("bracket",
                () => Bracket(FutureParen),
                (FutureParen = new Future<string>("parenthesize",
                    () => Parenthesize(FutureConcat),
                    (FutureConcat = new Future<String>("concat",
                        () => FuturePi + " < " + FutureOmega,
                        (FuturePi = new Future<string>("pi", () => CalculatePi(10))),
                        (FutureOmega = new Future<string>("omega", () => CalculateOmega()))
                    ))
                ))
            );

            /* Alternative
             
            // first group of expressions evaluating in parallel
            var pi = new Future<string>("pi", () => CalculatePi(10));
            var omega = new Future<string>("omega", () => CalculateOmega());

            // a single future expression dependent on all of the futures in the first group
            var paren = new Future<string>("parenthesize", () => Parenthesize(pi + " < " + omega), pi, omega);
                        
            // another single future expression dependent on the paren future
            var result = new Future<string>("bracket", () => Bracket(paren), paren);

            */

            Console.WriteLine("Do other stuff while calculation occurs...");

            try
            {
                Console.WriteLine("\n" + result);
            }
            catch (AggregateException ex)
            {
                Console.WriteLine("\n" + ex.Message);
            }

            TimeSpan ts = DateTime.Now - StartTime;
            Console.WriteLine("\n" + ts.TotalSeconds.ToString() + " seconds");

            Console.ReadKey();
        }

        static string CalculatePi(int NumberDigits)
        {
            //throw new ApplicationException("Failed to calculate Pi");
            Thread.Sleep(3000);
            return "3.1415926535";
        }

        static string CalculateOmega()
        {
            //throw new ApplicationException("Failed to calculate Omega");
            Thread.Sleep(3000);
            return "999999999999999";
        }

        static string Parenthesize(string Text)
        {
            Thread.Sleep(500);
            return "( " + Text + " )";
        }

        static string Bracket(string Text)
        {
            Thread.Sleep(500);
            return "[ " + Text + " ]";
        }
    }

    public class Future<T> : IDisposable
    {
        public string Name { get; set; }
        public bool Complete { get; protected set; }
        public Exception Error { get; protected set; }

        protected Func<T> Expression { get; set; }

        protected List<Future<T>> Factors;
        protected List<Future<T>> FactorsCompleted;
        protected List<Future<T>> FactorsFailed;

        public event Action<Future<T>> Completed;
        protected void OnCompleted()
        {
            Complete = true;

            if (Completed != null)
                Completed(this);
        }

        private T _Value;
        public T Value
        {
            get
            {
                // block until complete
                while (!Complete)
                {
                    Thread.Sleep(1);
                }

                if (Exceptions.Count > 0)
                    throw new AggregateException(Exceptions);

                return _Value;
            }
            private set { _Value = value; }
        }

        public List<Exception> Exceptions
        {
            get
            {
                var list = new List<Exception>();

                foreach (Future<T> Factor in Factors)
                {
                    list.AddRange(Factor.Exceptions);
                }

                if (Error != null)
                    list.Add(Error);

                return list;
            }
        }

        public static implicit operator T(Future<T> Future)
        {
            return Future.Value;
        }

        // naming a Future is optional
        public Future(Func<T> Expression, params Future<T>[] Factors) : this("<not named>", Expression, Factors) { }

        public Future(string Name, Func<T> Expression, params Future<T>[] Factors)
        {
            this.Name = Name;
            this.Expression = Expression;
            this.Factors = new List<Future<T>>(Factors);
            
            FactorsCompleted = new List<Future<T>>();
            FactorsFailed = new List<Future<T>>();

            foreach (Future<T> Factor in this.Factors)
            {
                if (Factor.Complete)
                    FactorsCompleted.Add(Factor);
                else
                    Factor.Completed += new Action<Future<T>>(Factor_Completed);
            }

            // there may not be any factors, or they may all be complete
            if (FactorsCompleted.Count == this.Factors.Count)
                Expression.BeginInvoke(ReceiveCallback, null);
        }

        private void Factor_Completed(Future<T> Factor)
        {
            if (!FactorsCompleted.Contains(Factor))
                FactorsCompleted.Add(Factor);

            if (Factor.Error != null && !FactorsFailed.Contains(Factor))
                FactorsFailed.Add(Factor);

            Factor.Completed -= new Action<Future<T>>(Factor_Completed);

            if (Exceptions.Count > 0)
            {
                Dispose();
                OnCompleted();
                return;
            }

            if (FactorsCompleted.Count == Factors.Count)
                Expression.BeginInvoke(ReceiveCallback, null);
        }

        private void ReceiveCallback(IAsyncResult AsyncResult)
        {
            try
            {
                Value = Expression.EndInvoke(AsyncResult);
            }
            catch (Exception ex)
            {
                Error = ex;
            }
            
            Dispose();

            // computation is completed, regardless of whether it succeeded or failed
            OnCompleted();
        }

        public void Dispose()
        {
            foreach (Future<T> Factor in Factors)
            {
                Factor.Completed -= new Action<Future<T>>(Factor_Completed);
            }
        }

        // helpful for debugging
        public override string ToString()
        {
            return Name + ", " + (Complete ? "Complete" : "Incomplete") + (Error != null ? ", Error=" + Error.Message : string.Empty);
        }
    }

    public class AggregateException : Exception
    {
        public List<Exception> Exceptions;

        public AggregateException(IEnumerable<Exception> Exceptions)
        {
            this.Exceptions = new List<Exception>(Exceptions);
        }

        public override string Message
        {
            get
            {
                string message = string.Empty;
                foreach (Exception ex in Exceptions)
                {
                    message += ex.Message + "\n";
                }
                return message;
            }
        }
    }
}
About these ads

One Response to “Concurrency & Coordination With Futures in C#”

  1. henrik said

    Race conditions.
    * Complete event. See discussion in your linked article. Contrary to their post-CTP version, you don’t fire the event for late subscribers and there are no other ways of subscribing than doing it after posting a computation that will try to evaluate directly.

    * Complete event II. Your checking if it’s null isn’t thread safe since another thread might unsubscribe after your thread has determined it’s safe to call the multicast delegate. You should assign a stack-local reference and use that to store the copy (multicast delegates are immutable) of the event handler before firing it.

    * FactorsFailed.Add(Factor); This is not a thread-safe call, so you are forced to lock around the add. Try yourself, to add a million items from many threads and see if the sum of all items is correct, without locking. Since you have a collection of dependencies/factors, their results can come in concurrently enough for list’s add to be pre-empted.

    * A busy-wait like this:
    // block until complete
    while (!Complete)
    {
    Thread.Sleep(1);
    }
    does not garantuee neither thread yeilding — for two reasons; 1) threads can’t be yielded when in the thread pool, 2) it’s then Sleep(0) you should call, nor does is it a very good way of doing it, because your bool is not marked to be volatile, so you might have different thread local storage information about it, or different orderings.

    Instead, use a manual reset event. So it becomes like this:
    private void OnCompleted()
    {
    IsCompleted = true;

    _Completed.Set();

    Completed.AtomicallyFire(this);
    }
    and
    ///
    /// Because the parameter evtHandler is a multicast-delegate which is
    /// immutable, it is copied in this method
    ///
    public static void AtomicallyFire(this Action evtHandler, T arg)
    {
    if (evtHandler == null) return;
    evtHandler(arg);
    }

    * Design issue: it would be nice to have an IList in your exception, instead of binding its implementation to the generic collection library. I e.g. use C5 a lot.

    Other than this, it’s a really nice piece of code and I have taken inspiration from it! :)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

%d bloggers like this: