Using async/await with a custom thread pool

Published on Author mzabskyLeave a comment

Async/await has a potential to dramatically simplify any C# code that relies on asynchronous methods. Its downside is that it is heavily tied into the threading model of whatever kind of application you are developing: console (system thread pool) vs. WPF vs. WinForms etc. – each of these execute the continuations somewhat differently. This article demonstrates how to control this behavior, so that you can direct the continuations to threads of your choosing.

A custom thread pool

Why would you want to implement such a thread pool? Why not just use the default one? Let’s name a few reasons:

  • You may need finer control over which tasks execute (eg. by giving priorities to individual tasks).
  • You may need better load control (eg. by suspending some of the pooled threads in the case of heavy system load).
  • You may not want anyone else’s tasks in your pool.
  • You may need tighter logging and monitoring.
  • You may need to execute common logic before/after executing each task.

Another case where you might need to control thread continuations is when you develop a custom GUI framework – you would need to direct all the continuations to your GUI thread.

Let’s start by specifying what I mean by a custom thread pool. In this article, I will work with a simple thread pool that supports task priorities – before a task of a lower priority is executed, there must not be any tasks of a higher priority in the queue.

“API” of the thread pool might look like this:

You can find full code here.

Executing continuations on our pooled threads

Execution of continuations is controlled by a synchronization context of the thread, which invoked the await. This means we will need to derive our own synchronization context and activate it for each of our pooled threads. Looking at documentation of the SynchronizationContext class, it has quite a few virtual methods. Fortunately, not all of these are used in context of async/await. We will need to implement following methods:

  • Post – This is the key method that receives a delegate and executes it on a thread, asynchronously.
  • CreateCopy – This doesn’t appear to be used in most circumstances, but some library functions do rely on it (I would receive exceptions from Windows Communication Foundation remote calls unless I implemented this properly).

Notably, methods Wait and Send don’t need to be implemented in our subclass. Wait is not used and Send has a correct implementation in the base class.

Our implementation will need to store a reference to the CustomThreadPool in its constructor and its Post method will then schedule the continuation as a task. It might look like this:

For now, one instance of the synchronization context per thread pool will be good enough, so we can create it in the pool’s constructor. We then need to assign this synchronization context to each of the pooled threads, which is done by invoking a static method SynchronizationContext.SetSynchronizationContext before the “while(true)” loop in our thread body code:

Even though .Net documentation doesn’t make it terribly clear, sadly, the SetSynchronizationContext method stores the context into a thread-local storage, so it needs to be called on each thread separately. This has a clear advantage: you could use CustomThreadPool even in an WPF app: the GUI thread is managed by its DispatcherSynchronizationContext while our threads are managed by us.

Maintaining task priority

The above solution has one major issue: all continuations are executed with default priority. Even if user schedules a task with high priority, the Post method of our synchronization context doesn’t know that. We will need to pass information about the original task’s priority to it somehow.

The Post method however doesn’t receive any information but a delegate (which is generally some compiler generated method) and a state object (which is similarly cryptic and can’t be controlled by us).

Therefore, we will store the information into the one place we can control: the synchronization context object itself. To do that, we will simply add a priority field to it:

Having one synchronization context is not good enough any more, we will need at least one per priority. For the sake of simplicity, we will make a new synchronization context for each task (this would likely be necessary if we wanted to store even more information into it). The synchronization context will have to be assigned to its thread every time a task executes:

Conclusion

And with that, we are done. We can now await on asynchronous methods within the code of tasks executed in our thread pool:

Above code will print text similar to this:

You can find the final code along with a small testing application here.

Leave a Reply

Your email address will not be published. Required fields are marked *