Monday, July 7, 2008

[2008.07.07] Coordinate Data Structures in System.Threading [Part II/III]

[4] System.Threading.SemaphoreSlim

  • the System.Threading.SemaphoreSlim class provides functionality that allows a developer to limit the number of threads that can access concurrently a resource or pool of resources as well as limit the costs associated with the .NET 2.0 version of a Semaphore
  • like ManualResetEvent, Semaphore is a thin wrapper around underlying kernel objects
Example: use a SemaphoreSlim to build a blocking queue

class BlockingQueue<T>

{ // start Class BlockingQueue<T>

    // create a generic Queue to model the Blocking Queue

    private Queue<T> _queue = new Queue<T>();

    // create a SemaphoreSlim object and specify the

    //  initial number of requests that can be granted

    //  concurrently

    private SemaphoreSlim _semaphore =

        new SemaphoreSlim(0);

 

    public void Enqueue(T data)

    {

        if (data == null) throw new

            ArgumentNullException("data");

        // lock the queue to prevet other threads from

        //  accessing it during this operation

        lock (_queue)

            _queue.Enqueue(data); // add data to queue

        // exits the semaphore and returns the value

        //  of SemaphoreSlim.CurrentCount which

        //  basically counts the number of operations

        //  performed thus far

            _semaphore.Release();

    }

 

    public T Dequeue()

    {

        // if CurrentCount > 0, decrement it by one

        //  and perform the Dequeue operation as this means

        //  we have data in the queue we can process

        // if CurrentCount = 0, blocks the current

        //  thread until it's greater than zero as this means

        //  that the queue does not have data to Dequeue so

        //  we will just block the thread this operation

        //  is running on, waiting until more data is

        //  added to the queue

        _semaphore.Wait();

        // lock the queue to prevet other threads from

        //  accessing it during this operation

        lock (_queue)

            return _queue.Dequeue();

    }

} // endClass BlockingQueue<T>

[5] System.Threading.CountdownEvent

  • like ManualResetEvent and AutoresetEvent, System.Threading.CountdownEvent is a synchronization primitive that allows threads to signal the event and other threads to wait for it to be set
  • waits for a number of things to happen
  • the Wait() method blocks the current thread until the System.Threading.CountdownEvent is set
  • CountdownEvent is set when a certain number of threads have signaled the event, counting down from a predetermined value
  • so you may have 10 work items and can initialize the CountdownEvent item to 10
    • now every time something completes, it calls Decrement() and only at zero does this event get set
  • this is frequently useful in fork/join operations, where a number of asynchronous operations may happen in the background, and only when those operations have completed is some main thread of execution allowed to proceed
Example:

int n = 10;

using(var ce = new CountdownEvent(n))

{

    for(int i=0; i<n; i++)

    {

        ThreadPool.QueueUserWorkItem(delegate

        {

            ...

            // registers a signal with the CountdownEvent by

            //  decrementing its count

            ce.Decrement();   

 

        });

    }

        // blocks the current thread until the CountdownEvent

        //  is set

        ce.Wait();

}

  • CountdownEvent is not limited to counting down from a preset value using Decrement()
  • it also provides an Increment() method that can be used to increment its current count if the count hasn't already reached 0
Example:

IEnumerable<T> src = ...;

using(CountdownEvent ce = new CountdownEvent(1))

{

    foreach(var element in src)

    {

        ce.Increment();

        ThreadPool.QueueUserWorkItem(state =>

        {

            Handle((T)state);

            ce.Decrement();

        }, element);

    }

    ce.Decrement();

    // block the main thread (i.e. the thread running

    //  the for loop) until all asynchronous work is done

    ce.Wait();              

}

  • in this example, CountdownEvent is initialized to a value of 1, representing the main thread (which runs the for loop) that’s spawning off the asynchronous work items
  • before each work item, Increment the count by 1, and when each work item finishes, it's decreased by one using the Decrement method
  • when the main thread finishes spawning work items, it also decrements the count by 1, and then Wait on the event for all work items to complete

No comments: