Передать на печать

Thread-safe структуры данных .NET 4 (ч.2)

BlockingCollection<T> - эта потокобезопасная коллекция называется блокирующей, поскольку действует по следующему принципу:

  • Если коллекция пустая, и некоторый код пытается извлечь элемент, то поток, в котором он выполняется, блокируется до появления хотя бы одного элемента;
  • Если коллекция уже содержит максимальное число элементов, то поток, в котором пытаются добавить новый элемент, блокируется до тех пор, пока место не будет освобождено.


Разумеется, это далеко не полное описание её возможностей. Давайте с ними ознакомимся. Начнём с базовых вещей. Изменение содержимого коллекции производится при помощи специальных методов:


ДобавлениеИзвлечение
Блокирующие методыНеблокирующие методы
Add(T);
Add(T, CancellationToken);
TryAdd(T)
TryAdd(T, Int32)
TryAdd(T, TimeSpan)
TryAdd(T, Int32, CancellationToken)

Take()
Take(CancellationToken)
TryTake(out T)
TryTake(out T, Int32)
TryTake(out T, TimeSpan)
TryTake(out T, Int32, CancellationToken)

Как видно, для добавления и извлечения элементов есть в т.ч. и неблокирующие методы (те, что начинаются с префикса"Try"). Т.е. возможен неблокирующий доступ к коллекции. При этом Try* метод вернет false, если элемент невозможно добавить или извлечь, и true в противном случае. Для обоих типов методов есть перегруженные версии с маркером отмены, с помощью которого можно асинхронно прекратить ожидание.

BlockingCollection<T> имеет механизм "завершения". Коллекция считается "завершенной", когда известно, что данные в неё добавляться больше не будут. После "завершения" все попытки добавить данные приведут к генерации исключения InvalidOperationException. Если попытаться извлечь данные из пустой "завершенной" коллекции, то также будет сгенерировано исключение. Сразу же после создания коллекция является "незавершенной", а её "завершение" выполняется посредством вызова метода CompleteAdding(). Таким образом, изменение состояния коллекции выполняется вручную. Зачем был придуман такой механизм? С его помощью обеспечивается синхронизация работы производителя и потребителя, т.е. участков кода, добавляющих и извлекающих данные из коллекции. Производитель может известить о том, что новые элементы он добавлять больше не будет. При этом потребитель будет знать, что не стоит ожидать пополнения коллекции. Ниже приведен пример, демонстрирующий подобный подход.

Давайте рассмотрим немного кода, работающего с BlockingCollection<T>. Сначала создадим экземпляр коллекции, причем в конструкторе укажем её максимальную ёмкость - в данном случае это 10 элементов. Впрочем, размер можно явно не задавать, вызвав конструктор без параметров - в таком случае коллекция будет расти "неограниченно":

// создаём коллекцию
BlockingCollection<int> collection = new BlockingCollection<int>(10);

10);[/code]
Далее в коллекцию добавим несколько элементов, и на консоль выведем свойства коллекции (о которых чуть позже):

[code]collection.Add(200);collection.Add(300);collection.Add(400);collection.Add(500); Console.WriteLine("Count: " + collection.Count);Console.WriteLine("BoundedCapacity: " + collection.BoundedCapacity);Console.WriteLine("IsCompleted: " + collection.IsCompleted);Console.WriteLine("IsAddingCompleted: " + collection.IsAddingCompleted);Console.WriteLine();[/code]
После этого запустим таймер, в делегате callback которого происходит добавление элементов и "завершение" коллекции:

[code]Timer timer = new Timer(delegate
{
Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 600"); collection.Add(600); Thread.SpinWait(300000000); Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 700"); collection.Add(700); collection.CompleteAdding();
},
null, 3000, Timeout.Infinite);[/code]
Делегат, указанный при создании таймера, будет вызван в потоке, отличном от того, в котором работает метод Main() - это позволит промоделировать доступ к коллекции со стороны нескольких потоков. Дальше в цикле начинаем перебор элементов (чтобы отличить вывод от разных потоков, используется различный цвет текста):

[code]foreach (var item in collection.GetConsumingEnumerable())
{
Console.ForegroundColor = ConsoleColor.Gray;
Console.WriteLine(DateTime.Now.ToLongTimeString() + " Получение " + item);
}[/code]
Затем снова выведем свойства коллекции на консоль. Вот скриншот с результатами работы программы:



Работает этот пример следующим образом. Создаётся коллекция и в неё помещается 4 элемента. После чего запускается таймер, и начинают одновременно работают 2 потока:

  • основной, где выполняется метод Main(). Здесь элементы изымаются из коллекции в цикле foreach;
  • поток таймера, где выполняется делегат callback таймера. Здесь элементы добавляются;


Те четыре элемента, которые были добавлены в самом начале, извлекаются за время менее 1 секунды. После этого основной поток блокируется, ожидая добавления новых элементов. Таймер запускается с задержкой в 3 секунды - и это видно на скриншоте: элемент 500 был извлечен в момент 06 сек., а элемент 600 был добавлен в 09 сек. В ту же 09 секунду цикл в основном потоке разблокируется, и выбирает только что добавленный элемент 600. В это время в потоке таймера создаётся искусственная задержка за счёт вызова SpinWait(). По её истечении в коллекцию добавляется элемент 700, который тут же извлекается в цикле основного потока. В заключении в потоке таймера происходит вызов collection.CompleteAdding(), который запрещает дальнейшее добавление элементов, т.е. "завершает" коллекцию. Поскольку к тому моменту коллекция пуста, и ожидание новых элементов становится бессмысленным, в основном потоке блокировка снимается и происходит выход из цикла foreach. Если закомментировать строчку с вызовом метода CompleteAdding(), то основной поток программы блокируется навсегда, ведь он будет ожидать поступления новых элементов, а поток таймера тем временем завершит работу. Т.о. выполнение "завершения" позволяет синхронизировать два параллельно работающих потока, и предотвратить подобное развитие событий.

Что касается свойств коллекции, то здесь наиболее интересны следующие:

  • IsAddingCompleted - возвращает true, если коллекция помечена как завершенная, т.е. дальнейшее добавление элементов запрещено, false в противном случае;
  • IsCompleted - возвращает true, если коллекция завершенная и все элементы выбраны, иначе false;
  • BoundedCapacity - указанная при создании емкость. Возвращает -1, если коллекция "безразмерная";


Ещё раз взглянув на скриншот, можно заметить, что в начале, когда в коллекцию добавили несколько элементов, оба логических свойства равны false, что вполне ожидаемо, ведь коллекция не "завершена". В конце работы, напротив, оба они равны true, поскольку в потоке таймера мы объявили коллекцию "завершенной" (т.е. IsAddingCompleted = true), а в основном потоке выбрали из неё все элементы (т.е. IsCompleted = true).

Возникает законный вопрос - а что с порядком элементов? Почему элементы извлекаются по принципу FIFO - это что, принцип работы коллекции? Или можно как-то на это повлиять? Ответ на эти вопросы даст более подробное рассмотрение перегруженного конструктора коллекции. В одной из его модификаций есть возможность указать ссылку на хранилище элементов - экземпляр класса, реализующего IProducerConsumerCollection<T>. Среди имеющихся в арсенале .NET 4 beta 2 средств это следующие структуры данных:

  • ConcurrentStack<T> - потокобезопасный стек;
  • ConcurrentQueue<T> - потокобезопасная очередь;
  • ConcurrentBag<T> - потокобезопасная неупорядоченная коллекция, допускающая дубликаты;


Т.е. структура данных BlockingCollection<T> реализует не хранилище, а только способ доступа с блокировкой и "завершением". Причем по умолчанию для хранения элементов используется очередь ConcurrentQueue<T>, отсюда и соблюдение принципа FIFO при работе с элементами. В рассмотренном выше примере мы можем заменить вызов конструктора следующим:

[code]BlockingCollection<int> collection =
new BlockingCollection<int>(new ConcurrentStack<int>(), 10);[/code]
т.е. использовать стек. Тогда получим такой результат:



Очевидно, что порядок извлечения элементов поменялся, и теперь он соответствует LIFO, что характерно для стека. Т.о. есть возможность реализовать хранилище данных, наиболее подходящее в данном конкретном случае, и использовать его в BlockingCollection<T>. Для этого достаточно наследовать интерфейс IProducerConsumerCollection<T>:

[code]public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
}[/code]
В заключение хочу отметить несколько любопытных статических методов класса BlockingCollection<T>. С их помощью можно добавлять или извлекать элементы, взаимодействуя с любой из коллекций, переданных в качестве параметра. Традиционно, есть 2 версии этих методов:

  • Блокирующие:
    int BlockingCollection<T>.AddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TakeFromAny(BlockingCollection<T>[], out T);
  • Неблокирующие:
    int BlockingCollection<T>.TryAddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TryTakeFromAny(BlockingCollection<T>[], out T);


Как видим, они принимают массив коллекций, и параметр - добавляемый элемент или out-параметр - извлекаемый. Возвращают эти методы индекс коллекции в массиве, в которую добавили данные, или, соответственно, извлекли. При этом блокирующие методы ждут появления свободного места в коллекции или нового элемента, блокируя при этом поток, в котором выполняются. Неблокирующие, в зависимости от вызванной перегруженной версии, сразу возвращают управление, либо "пытают счастья" указанный таймаут. Разумеется, в списке выше приведены не все сигнатуры методов - есть, например, принимающие маркер отмены, CancellationToken, подробнее лучше посмотреть MSDN.

Комментарии к 1-й части этого обзора побудили меня внимательнее отнестись к рассмотрению возвращаемых методами значений, и не зря. Изучая статические методы, я подумал - когда, например, TakeAnyFrom() вернёт -1? И, честно говоря, так и не смог придумать сценария. Если все коллекции массива-параметра пусты - он будет вечно ждать их пополнения или "завершения". Однако, если хотя бы одну из них завершить, получим ArgumentException. Если же положить в хотя бы одну коллекцию данные, метод вернёт её индекс и в out-параметре те самые данные. Рефлектор подсказал, что на самом деле все статические методы BlockingCollection<T> вызывают внутри один и тот же метод. Но я так и не понял, как при задании бесконечного таймаута неблокирующий метод может вернуть -1, поэтому переадресовал вопрос на форум MSDN. Для тех, кто заинтересовался, топик здесь, а вот тут соответствующая тема на MS Connect. Пока сошлись на том, что это просто ошибка в документации - копипаст с описания метода TryTakeAnyFrom().

Другие примеры использования BlockingCollection<T> можно найти на сайте MSDN Code Gallery. Там имеется страничка с исходными кодами, на которых демонстрируется использование тех или иных средств распараллеливания кода, входящих в .NET 4. На момент публикации примеры есть только для .NET 4 beta 1, но скоро опубликуют комплект и для beta 2. В составе этих примеров есть, например, набор методов-расширений, позволяющий "преобразовать" данную коллекцию в объект типа IProducerConsumerCollection<T>.

Хочу обратить внимание, что наиболее актуальная реализация рассматриваемой коллекции находится в составе .NET 4 beta 2, который стал доступен на днях. Если возникнет желание опробовать её в деле, рекомендую установить Visual Studio 2010 beta 2. По сравнению с прошлогодней июньской CTP-версией, работающей под .NET 3.5, некоторые методы были переименованы. Впрочем, это касается всей библиотеки Parallel Extensions. Полное описание BlockingCollection<T> можно найти в соответствующем разделе MSDN.

Vitus, TheVista.Ru Team
Ноябрь 2009

  Передать на печать





Все права принадлежат © MSInsider.ru и TheVista.ru, 2013
Сайт является источником уникальной информации о семействе операционных систем Windows и других продуктах Microsoft. Перепечатка материалов возможна только с разрешения редакции.
Работает на WMS 1.1 (Страница создана за 0.056 секунд)