DEV404 – Hardcore Workflow 4

Thanks to everyone who attended the DEV404 session at TechEd NZ. We wanted to cover off some new material that we haven’t seen else where and so Pete concentrated on the extensibility of the WorkflowServiceHost and the WorkflowServiceHostFactory. Before we got there I felt we needed a lead in and so I gave a brief overview of the workflow runtime, much of the material was covered in depth at PDC09 in the session Workflow 4 Inside Out.

The key point was the single threaded nature of the workflow scheduler. There is a single thread responsible for scheduling the execution of the activities in the activity tree, you really do not want to block this thread. This is the thread that runs the Execute method of synchronous activities, to show this in action I built the following workflow:

There’s a collection of strings populated with URLs of a few well known websites.

Then, there is a ParallelForEach that iterates over the collection and fetches the contents of the web page. The FetchUrl activity was written as follows:

using System.Activities;
using System.IO;
using System.Net;

namespace WorkflowRuntime.Activities {
    /// <summary>
    /// Fetch HTTP resource synchronously 
    /// </summary>
    public sealed class FetchUrlSync: CodeActivity {
        public InArgument Address { get; set; }
        protected override string Execute(CodeActivityContext context) {
            string address = context.GetValue(Address);
            string content = string.Empty;
            WebRequest request = HttpWebRequest.Create(address); 

            using(HttpWebResponse response = request.GetResponse() as HttpWebResponse) {
                if(response != null) {
                    using (Stream stream = response.GetResponseStream()) {
                        if (stream != null) {
                            StreamReader reader = new StreamReader(stream);
                            content = reader.ReadToEnd();
                        }
                    }
                }
            }
            return content;
        }
    }
}

The HttpWebRequest class is used to fetch the page contents. Running the workflow gives the following results:

The Urls are fetched one at a time, the same behavior that you would see if the activities were scheduled in a sequence rather than in a parallel. Why? This is the single threaded scheduler, it must wait for the Execute() method of the activity to complete before it can schedule the next activity.

What we want to see is:

So how can we achieve this? Well, we have to rewrite the FetchUrl activity to perform its work asynchronously. The HttpWebRequest already has async support via the BeginGetResponse and EndGetResponse method pairs; this is a standard pattern in .NET for async programming. The FetchUrl activity becomes:

using System;
using System.Activities;
using System.IO;
using System.Net;

namespace WorkflowRuntime.Activities {
    /// <summary>
    /// Fetch HTTP resource asynchronously
    /// <summary>
    public sealed class FetchUrlAsync: AsyncCodeActivity {
        public InArgument Address { get; set; }

        protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) {
            string address = context.GetValue(Address);
            WebRequest request = HttpWebRequest.Create(address);
            context.UserState = request;
            return request.BeginGetResponse(callback, state);
        }

        protected override string EndExecute(AsyncCodeActivityContext context, IAsyncResult result) {
            string content = string.Empty; WebRequest request = (WebRequest) context.UserState;
            using (HttpWebResponse response = request.EndGetResponse(result) as HttpWebResponse) {
                if (response != null) {
                    using (Stream stream = response.GetResponseStream()) {
                        if (stream != null) {
                            StreamReader reader = new StreamReader(stream);
                            content = reader.ReadToEnd();
                        }
                    }
                }
            }
            return content;
        }
    }
}

We call the BeginGetResponse method passing in the callback and state object given to us by the workflow runtime as part of the AsyncCodeActivity.BeginExecute method. When the fetch is completed, by a separate worker thread, the workflow runtime is called back and the EndExecute method is invoked. In this method we take the resultant stream and read the contents into a string that we return. The workflow scheduler thread is no longer responsible for fetching the content, therefore it can schedule the fetch of the next Url and we get the parallel behavior we expect. All fetches are scheduled and then the workflow runtime waits to be called back by each worker thread when complete.

The time taken for the synchronous fetches to complete is the sum total of all fetches. For the asynchronous fetches, it is the time of the longest fetch plus a little overhead.

A basic rule of workflow is to perform I/O asynchronously and not to block the scheduler thread.

The sample code and PPT deck is available from https://public.me.com/stefsewell

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: