Очередь поставщик/потребитель

Наше простое Wait/Pulse -приложение по сути – очередь поставщик/потребитель, реализованная нами ранее с использованием AutoResetEvent. Поставщик ставит в очередь задачи (обычно в главном потоке), в то время как один или более потребителей в рабочих потоках разбирают и выполняют их одну за другой.

В следующем примере для представления задачи будет использоваться строка. А очередь задач будет, соответственно, такой:

Queue<string> taskQ = new Queue<string>();

Поскольку очередь будет использоваться из нескольких потоков, необходимо обернуть в lock все операции чтения и записи в очередь. Так будет выглядеть постановка задачи в очередь:

lock (locker) { taskQ.Enqueue("my task"); Monitor.PulseAll(locker); // Мы изменили условие блокировки }

Поскольку изменяется потенциальное условие блокировки, необходимо выдать сигнал. Мы вызываем PulseAll, а не Pulse, так как нужно учитывать, что потребителей может быть несколько, соответственно, ожидать будут несколько потоков.

Далее, требуется, чтобы рабочие потоки были блокированы, когда им нечего делать, другими словами, когда очередь пуста. Следовательно, наше условие блокировки – taskQ.Count==0. Вот выполняющее это выражение Wait:

lock (locker) while (taskQ.Count == 0) Monitor.Wait(locker);

Следующий шаг – рабочий поток удаляет задачу из очереди и исполняет её:

lock (locker) while (taskQ.Count == 0) Monitor.Wait(locker); string task; lock (locker) task = taskQ.Dequeue();

То, что получилось, однако, не является потокобезопасным: во время удаления задачи информация о состоянии очереди может быть уже устаревшей, так как она получена в предыдущем выражении lock. Посмотрите, что получится, если запустить одновременно два потока-потребителя с одним-единственным элементом в очереди. Возможно, ни один из них не вошел бы в ожидание в цикле – оба увидели бы в очереди один элемент. Далее они оба попытались бы удалить из очереди этот элемент, с генерацией исключения в потоке, который будет делать это вторым. Для исправления ситуации будем удерживать lock немного дольше – до окончания взаимодействия с очередью:

string task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); }

(Нет необходимости вызывать Pulse после удаления из очереди, так как никакой потребитель не может быть разблокирован имеющимися немногими элементами в очереди).

Как только задача удалена из очереди, нет никаких причин сохранять блокировку. Снятие блокировки в этом месте позволяет потребителю выполнять продолжительную задачу без ненужного блокирования других потоков.

Вот полный текст программы. Как и в версии с AutoResetEvent, постановка в очередь задачи со значением null будет сигналом потребителю на завершение (после завершения других недовыполненных задач). Поскольку потребителей может быть несколько, необходимо будет добавить по null -задаче на каждого, чтобы полностью завершить очередь:

Wait/Pulse шаблон #2: Очередь поставщик/потребитель

using System; using System.Threading; using System.Collections.Generic; public class TaskQueue: IDisposable { object locker = new object(); Thread[] workers; Queue<string> taskQ = new Queue<string>(); public TaskQueue(int workerCount) { workers = new Thread[workerCount]; // Создать и запустить отдельный поток на каждого потребителя for (int i = 0; i < workerCount; i++) (workers [i] = new Thread(Consume)).Start(); } public void Dispose() { // Добавить по null-задаче на каждого завершаемого потребителя foreach (Thread worker in workers) EnqueueTask(null); foreach (Thread worker in workers) worker.Join(); } public void EnqueueTask(string task) { lock (locker) { taskQ.Enqueue(task); Monitor.PulseAll(locker); } } void Consume() { while (true) { string task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); } if (task == null) return; // Сигнал на выход Console.Write(task); Thread.Sleep(1000); // Имитация длительной работы } } }

Вот метод Main, в котором создается очередь задач, задаются два потока-потребителя, для которых в очередь ставится 10 задач:

static void Main() { using(TaskQueue q = new TaskQueue(2)) { Console.WriteLine("Помещаем в очередь 10 задач"); Console.WriteLine("Ожидаем завершения задач..."); for (int i = 0; i < 10; i++) q.EnqueueTask(" Задача" + i); } // Выход из using приводит к вызову метода Dispose двух TaskQueue, // завершая потребителей после выполнения всех задач. Console.WriteLine("\r\nВсе задачи выполнены!"); }

Консольный вывод:

Помещаем в очередь 10 задач Ожидаем завершения задач... Задача0 Задача1 (пауза...) Задача2 Задача3 (пауза...) Задача4 Задача5 (пауза...) Задача6 Задача7 (пауза...) Задача8 Задача9 (пауза...) Все задачи выполнены!

В соответствии с нашим шаблоном проектирования, если удалить PulseAll и заменить Wait на переключение блокировок, мы получим тот же самый результат.


Понравилась статья? Добавь ее в закладку (CTRL+D) и не забудь поделиться с друзьями:  



double arrow
Сейчас читают про: