Thursday, July 17, 2008

Thread Synchronized Queing

Today's topic gets back to some multi-threading. I find it very useful to have threads pull work from a queue and have one master assignment manager feed the queue. However the standard System.Collections.Generic.Queue is a little inadequate for me because it is not fully thread-safe. I found it a little disheartening that the Queue class members are not overridable either. So we write our own, here are my requirements...


  • The queue must synchronize multiple writer threads writing elements to the queue.

  • The queue must synchronize multiple reader threads reading from the queue.

  • If there is nothing in the queue, a reader thread must wait for something or timeout.

  • Those waiting on the queue can be “interrupted”.

  • Of course, no two readers will ever get the same element off the queue.



  • So let's start coding...

    I start with a new class called SyncQueue in my own name space and I will make it generic.


    namespace QueueExample
    {
    public class SyncQueue
    {
    public void Enqueue(T element)
    {
    // we'll get to it
    }
    public T Dequeue()
    {
    return Dequeue(-1); // wait forever
    }
    public T Dequeue(int timeout_milliseconds)
    {
    // we'll get to this later
    return default(T); // effectively null
    }
    public void Interrupt()
    {
    }
    public T Peek()
    {
    return default(T);
    }
    public int Count { get { return 0; } }
    }
    }


    So there's the basic skeleton of the queue. You can add elements with Enqueue(), remove elements with Dequeue() and even look at the next element with Peek(). There is an operation for interrupting watiting threads called Interrupt(), and for good measures, I will be able to check how many elements are on the queue with a Count property.

    But, how will we synchronize the threads you ask. We will make use of a couple of synchronization tools at our disposal. After adding the name space System.Threading, we have two WaitHandles available to us, AutoResetEvent and ManualResetEvent. So I add them to my class in a private array of WaitHandles called "handles";



    private WaitHandle[] handles = {
    new AutoResetEvent(false),
    new ManualResetEvent(false),
    };


    The other item we will need is a lock so that we can protect the queue from multiple threads tampering with its critical members at the same time. Oh! and lest we forget, we need to add a queue. So I add a System.Collections.Generic.Queue to my class and call it _q.


    private Queue _q = new Queue();



    And now when I need to access a member of _q, I will wrap the critical section with a lock(_q) { } block. So, let's start with something real easy. Let's start with Count.


    public int Count { get { lock(_q){ return _q.Count; } } }



    Count get the _q.Count value after first gaining exclusive access to the queue. The fact is, it really shouldn't matter because as soon as the lock(_q) is released, the count could be changed. This value can be very dynamic so its instantaneous value is of limited use. The lock could be omitted entirely, since interlocking the value won't affect our results.

    Next, lets look at Peek(). It returns the value of the next item on the queue. But, unless there is some form of thread synchronization external to the queue class, peeking at the queue is just for fun because by the time you actually try to dequeue something, the one you peeked will probably be gone. But, here it is anyway, in case you have some additional synchronization you'd like to add.


    public T Peek()
    {
    lock(_q)
    {
    if(_q.Count > 0)
    return _q.Peek();
    }
    return default(T);
    }


    I haven't mentioned it yet, but notice that there is no unlock() that one must call. The lock() is convenient because it effectively implements a try/finally block with a “Monitor” that looks like this...


    Monitor.Enter(_q)
    try {
    //do your stuff
    }
    finally {
    Monitor.Exit(_q);
    }


    Thus it doesn't matter if you return() or throw() an exception, the “finally” will always be called to release the lock. Monitors have some other nice features, but we'll leave that for another day.

    We still haven't used that “handles” array I create nor have we implemented the Enqueue, Dequeue or Interrupt, so let's move on. Next, let's look at Enqueue.


    public void Enqueue(T element)
    {
    lock(_q)
    {
    _q.Enqueue(element);
    ((AutoResetEvent)handles[0]).Set();
    }
    }


    Enqueue locks the queue, adds the element and then signals any thread waiting for a queue element with the AutoResetEvent we setup above. The AutoResetEvent was created in the Reset state so that any thread attempting to remove an item before anything is added will have to wait. By calling the “Set()” method, the Enqueue method is effectively releasing one waiting thread to attempt to lock and read the queue and immediately resets so that only one waiting thread is released.

    Now for Dequeue. It needs to lock the queue, check if there are any elements, and if not, unlock the queue and wait for something to come available. Note that Dequeue cannot lock the queue and then while locked wait for something to come available. With the queue locked, nothing would ever come available.


    public T Dequeue(int timeout_milliseconds)
    {
    T element;
    try
    {
    while(true)
    {
    if(WaitHandle.WaitAny(handles, timeout_milliseconds, true) == 0)
    {
    lock(_q)
    {
    if(_q.Count > 0)
    {
    element = _q.Dequeue();
    if(_q.Count > 0)
    ((AutoResetEvent)handles[0]).Set();
    return element;
    }
    }
    }
    else
    {
    return default(T);
    }
    }
    }
    catch (Exception e)
    {
    return default(T);
    }
    }


    That's a lot more complex than the rest of the methods, so let's look at it in detail starting from the outer most block and working in. We start with a “try” block because we are going to be doing things that are known to cause exceptions from time to time. Specifically, we will be calling WaitAny() which throws an exception if a thread aborts while holding a synchronization handle we need. WaitAny can throw an exception for several other reasons, too. So, we need to deal with the possibility (just returning a null is not the best response, you should expand this section).

    Next step down is a loop where threads compete for the queue and its elements. The first step is to wait. When ever something is added to the queue, the AutoResetEvent is Set, so we wait for something to be added. If something has already been added, the event will be set when the thread gets to this point. The WaitAny call will return 0 if the AutoResetEvent is set or it will return 1 if the ManualResetEvent is Set. It returns a value equal to WaitHandle.WaitTimeout if the operation times out. Therefore we are looking for 0 if we are to possibly take an element from the queue. The other values will result in the else condition being met and the default return value of null being returned. This will be the basics to the operation of the Interrupt() method.

    Now if a 0 is returned, there may be something on the queue and the thread is released to try and lock the queue. It is possible that other items were enqueued as well resulting in multiple threads trying to get to the queue, so the lock(_q) is again necessary. Upon obtaining the lock, there is a possibility that there are no elements on the queue, so we first check the count and only dequeue if the count is greater than 0. If nothing is there, we go back to the WaitAny and wait again. After taking an element off the queue, we again check the count and if there are still elements, we Set the AutoResetEvent so the next waiting thread can proceed.

    Last, we have the Interrupt() method. The Interrupt() method is now very easy.


    public void Interrupt()
    {
    ((ManualResetEvent)handles[1]).Set();
    }


    The Interrupt() method uses the ManualResetEvent to signal waiting threads. This type of event stays signalled until manually reset. So all threads waiting and andy threads coming to the WaitAny will immediately release and fall into the else condition. Note that Interrupt and Timeout are indistinguishable based upon return value. Your thread architecture will need to deal with the higher issues of interrupting threads.

    I moved through the dequeue code pretty quickly, and you may be wondering why the dequeue fires the AutoResetEvent event at all. Maybe you think if it didn't fire the event then there wouldn't be any competition for the queue and then there wouldn't be the need to check if there were anything on the queue. You may ask isn't it guaranteed that there is something on the queue if you get the event signal? No, its not.

    First, consider the very simple and likely case that several items will be added to the queue before any thread gets an element from the queue. The AutoResetEvent will be Set() multiple times, but only allow one thread through, It does not keep track of how many elements there are, it is only a signal to say that there is at least one element on the queue. So, if there are multiple elements on the queue, even though the AutoResetEvent has been Set() several times, the first thread that dequeues results in the AutoResetEvent being Reset(), therefore, it must signal again, or the queue will stop working.

    Now, consider the condition where an item is added to the queue while 3 threads already wait for an element. One thread releases on the event being signaled, but before it can obtain the lock, yet another thread writes another element to the queue and sets the event. This releases the second thread. The two free threads now compete for the lock and only one gets through while the other waits at the lock. Now when the winner takes an element off the queue, it finds another element still there, so it signals the event again according to our first scenario. This releases the third thread to compete with the second thread over the single remaining item on the thread. Therefore, they must check the quantity before they try to take an element. One of the threads will lose and have to go back and wait.

    This queue is rather simple. It does not limit capacity, or seriously block any writers from adding to the queue. The error checking in weak, but all in all, you should be able to gleen a better understanding of the complexities of queue synchonization in a multi-threaded application. And you get some working code to start with to boot! Here is the full listing ...


    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;

    namespace QueueExample
    {
    public class SyncQueue
    {
    private WaitHandle[] handles = {
    new AutoResetEvent(false),
    new ManualResetEvent(false),
    };
    private Queue _q = new Queue();

    public int Count { get { lock (_q) { return _q.Count; } } }
    public T Peek()
    {
    lock (_q)
    {
    if (_q.Count > 0)
    return _q.Peek();
    }
    return default(T);
    }

    public void Enqueue(T element)
    {
    lock (_q)
    {
    _q.Enqueue(element);
    ((AutoResetEvent)handles[0]).Set();
    }
    }

    public T Dequeue(int timeout_milliseconds)
    {
    T element;
    try
    {
    while (true)
    {
    if (WaitHandle.WaitAny(handles, timeout_milliseconds, true) == 0)
    {
    lock (_q)
    {
    if (_q.Count > 0)
    {
    element = _q.Dequeue();
    if (_q.Count > 0)
    ((AutoResetEvent)handles[0]).Set();
    return element;
    }
    }
    }
    else
    {
    return default(T);
    }
    }
    }
    catch (Exception e)
    {
    return default(T);
    }
    }

    public T Dequeue()
    {
    return Dequeue(-1);
    }

    public void Interrupt()
    {
    ((ManualResetEvent)handles[1]).Set();
    }
    public void Uninterrupt()
    {
    // for completeness, lets the queue be used again
    ((ManualResetEvent)handles[1]).Reset();
    }
    }
    }


    For more on queing, take a look at my article A Simple Task Queue.

    3 comments:

    Les Potter - Author said...

    Please forgive the missing "<" T ">" that has been stripped out of the class definition and out of the Queue declaration. I am still figuring out how to post code on my blog such that the < T > is not eaten.

    Les Potter - Author said...

    The < T > should be fixed now.

    Fredrik said...

    nice stuff! found it very informative.