5

Применение BlockingCollection, используя подход, когда элементы вытаскиваются из очереди(например ConcurrentQueue), используя метод Take в цикле - всегда блокирует поток. Очевидно, что процессорное время не занимается, однако поток все же занят и не может использоваться для выполнения других задач. Какая есть альтернатива, когда нужно последовательно вычитывать элементы из очереди и при этом не блокировать поток? Конечно, можно сделать велосипед, накрутить событий или чего-нибудь еще, но хотелось бы понять, нет ли каких-либо стандартных способов это сделать, кроме как использовать BlockingCollection и метод Take.

P.S. Есть метод TryTake, но я не могу найти решение с его использованием, эквивалентное использованию Take и при этом неблокирующее поток.

Sleeeper
  • 1,013

1 Answers1

6

Вам на самом деле нужен класс BufferBlock<T> из библиотеки Dataflow (nuget-пакет Microsoft.Tpl.Dataflow).

Этот класс заменяет собой BlockingCollection<T>, и позволяет асинхронный доступ:

await queue.ReceiveAsync()

Таким образом, поток не будет заблокирован. Но у вас получится async-интерфейс.

Больше примеров с работающим кодом есть в этом ответе.


Ещё одним вариантом является async-обёртка над IProducerCosumerCollection из AsyncEx Стивена Клири: https://github.com/StephenCleary/AsyncEx/wiki/AsyncCollection

VladD
  • 206,799
  • Спасибо большое за ответ. Правильно ли я понимаю, что BufferBlock это тоже враппер? То есть во внутрь я могу засунуть свою реализацию IProducerCosumerCollection? – Sleeeper Dec 18 '17 at 22:30
  • @Sleeeper: Насколько я понимаю, нет, он сам по себе готовый примитив. Но его можно конфигурировать. А для чего вам своя реализация? – VladD Dec 18 '17 at 22:31
  • Своя реализация для приоритетной очереди, которую я и использую в рамках BlockingCollection. Хотя, конечно, в рамках Dataflow тоже наверняка что-то есть. – Sleeeper Dec 18 '17 at 22:33
  • 1
    @Sleeeper: Добавил ещё один вариант в ответ. – VladD Dec 18 '17 at 22:42
  • VladD, спасибо Вам огромное! Интересно будет разобраться, как это работает. Как я понимаю, автор из Вашего ответа по сути и взял BlockingCollection и сделал ее асинхронный вариант. – Sleeeper Dec 18 '17 at 22:49
  • @Sleeeper: Ага, именно так. – VladD Dec 18 '17 at 22:56
  • подскажите пожалуйста, что я делаю не так. Есть некий набор адаптеров, каждый из которых запускает примерно такой код: запускается Async метод, внутри которого бесконечный while и await _queue.TakeAsync(). То есть у каждого адаптера своя очередь. Смотрю в диспетчере задач - потоки множатся. Может быть я делаю что-то не так? Я ожидал, что количество потоков будет расти только при большей нагрузке(то есть реальной активности), а в моменты ожидания их не будет. – Sleeeper Dec 19 '17 at 20:20
  • @Sleeeper: Странно, не должны вроде. А можно какой-нибудь пример того, что вы делаете? И интересно, что за потоки? Может быть, это ThreadPool увидел активность и решил на всякий случай динамически подрасти заранее, чтобы потом не создавать потоки из-за нехватки? (Он так иногда делает.) – VladD Dec 20 '17 at 01:35
  • Да нет, все оказалось проще, потоки создавались в другом месте. Спасибо Вам за помощь. – Sleeeper Dec 25 '17 at 22:11
  • @Sleeeper: А, ну слава богу, одной загадкой меньше. – VladD Dec 25 '17 at 22:13