Friday, July 18, 2008

[2008.07.18] Coordinate Data Structures in System.Threading.Collections [Part II/II]

[3] System.Threading.Collections.BlockingCollection<T>

  • a blocking queue is a classic solution to many producer/consumer problems where you have some threads that are producing work and other threads are consuming that work
  • producers generate data and store it into a queue; consumers remove data from the queue and process it
  • such a queue is typically thread-safe so that producers and consumers can access it concurrently from multiple threads
Example:

// create in instance of BlockingCollection

var blockColl = new BlockingCollection<T>();

 

// now somewhere a thread can call Add to append

// items to the underlying data structure

// NOTE: by default, the BlockingCollection

// uses a ConcurrentQueue<T> FIFO structure

// as the underlying data storage structure

blockColl.Add(data);

 

// somewhere else another thread can call Remove

// this is guaranteed to return data because it

// will block until it has data to return

blockColl.Remove();

  • additionally, such a queue has blocking functionality built into it, so that consumers requesting data can block until data arrives
  • there are additional features one might want in a blocking collection:
    • just as consumers may want to block, waiting for data to arrive, producers may also want to block waiting for space to be available in the queue, a technique useful for throttling data production
    • another useful capability is signaling that no more data will be produced, such that consumers waiting for data to arrive don’t wait indefinitely if no more data is inbound
    • it’s also useful for some scenarios, such as in pipelining scenarios, to be able to add to or remove from any one of several blocking queues
  • there are some overloads to the BlockingCollection<T>:
    • [1] one overload lets you create an upperbound on the number of items you want in the collection, so for instance, if you want to have 10 items, you can instantiate it as:

      var blockColl = new BlockingCollection<T>(10);

      • now if another thread comes along and try to add the 11th item to the collection, it will block
      • essentially it throttles the producers in a producer/consumer relationship where the producer is running faster than the consumer
      • in this way, throttling does not let producer use up all the system resources
    • [2] another overload lets you choose your own underlying storage mechanism
      • the default implementation of the BlockingCollection uses the first-in-first-out (FIFO) ConcurrentQueue<T> data structure
      • however, it also acknowledges that the FIFO behavior of queues isn’t always the most desirable and can easily convert this to a last-in-first-out (LIFO) structure using a ConcurrentStack<T>

        var blockColl =

            new BlockingCollection<T>

                (new ConcurrentStack());

  • BlockingCollection<T> acts as a wrapper, that is, accepts as a parameter around any concurrent collection that implements the System.Threading.Collections.IConcurrentCollection<T> interface, providing blocking and bounding capabilities on top of such a collection
  • both the ConcurrentStack<T> and ConcurrentQueue<T> types implement IConcurrentCollection<T>, allowing them to be used with BlockingCollection<T>
    • however, custom implementations of IConcurrentCollection<T>, allowing them to be used with can also be used
Example: consider creating a blocking queue of strings:

private BlockingCollection<string> _data =

    new BlockingCollection<string>();

 

// can also be done by explicitly providing

//  the underlying collection to be used:

 

private BlockingCollection<string> _data =

          new BlockingCollection<string>

              (new ConcurrentQueue<string>());

producer threads can now add data to the queue

private void Producer()

{

    while(true)

    {

        string s = ...;

        _data.Add(s);

    }

}

while one or more consumer threads are removing data from the queue, blocking as necessary until data is available

private void Consumer()

{

    while(true)

    {

        string s = _data.Remove();

        UseString(s);

    }

}

  • such a consumer loop can be made simpler by taking advantage of BlockingCollection<T>’s GetComsumingEnumerable method
  • this method returns an IEnumerable<T> that calls Remove under the covers and removes the next element from the collection on each call to System.Threading.Collections.IEnumerable<T>.MoveNext
  • this allows for a BlockingCollection<T> to be consumed in standard constructs like a foreach loop and PLINQ

No comments: