Хочу сделать свой web-scraper многопоточным.
В работе scraper'а используется две коллекции - посещенных ссылок и очередь ссылок для обработки.
По ссылкам хожу через Selenium ChromeDriver, а чтобы распараллелить ChromeDriver - нужно работать через потоки, с Task'ами рабочих подходов не встретил.
По сути имеем producer/consumer, где несколько producer/consumer'ов в одном лице со следующими ограничениями:
- Нельзя плодить экземпляры браузера в безлимитном режиме (память)
- Нужно переиспользовать экземпляры, выделенные потоку повторно (долгая инициализация)
И казалось бы, тут очевидное решение - сделать несколько Thread'ов c общими коллекциями посещенных и не посещенных ссылок и «дело в шляпе», и пусть каждый поток работает до тех пор, пока в очереди что-то есть.
Но может возникнуть следующая неприятная ситуация. К примеру, есть 8 экземпляров браузера, в 8 потоках «гуляющие по сети».
И вот, в какой-то момент, очередь ссылок стала пуста, 1 поток все еще работает над текущей страницей, а остальные 7 поглядели, что обрабатывать им нечего и завершились.
И тут первый поток добавляет в очередь 10000 ссылок (всякое бывает), с которыми он вынужден разгребаться до скончания времен.
В теории возникает 4 ситуации:
| Очередь ссылок | Другие потоки | Ожидаемое поведение текущего потока |
|---|---|---|
| Есть ссылки | Есть работающие | Взять следующую ссылку из очереди |
| Есть ссылки | Есть ожидающие ссылок | Взять следующую ссылку из очереди |
| Пустая | Есть работающие | Перейти к ожиданию результатов других потоков |
| Пустая | Есть ожидающие ссылок | Завершить работу |
И вопрос мой заключается в том, как реализовать описанное в третьем пункте ожидание, чтобы, грубо говоря:
- Если очередь ссылок пуста, но есть потоки «в работе» - текущий поток должен ожидать изменения «статуса» любого из потоков.
- Как только какой-либо из соседей закончил текущую итерацию работы, повторить проверку.
- И только в случае, если все потоки «ждут», они должны завершиться.
В таком случае, потоки завершатся все разом и именно в тот момент, когда гарантированно не будет новых ссылок.
Вопрос - как?
Однопоточный код у меня примерно такого плана:
var unvisitedLinks = new ConcurrentQueue<IUrl>(startUrls);
var visitedLinks = new BlockingCollection<IUrl>();
while (unvisitedLinks.TryDequeue(out var currentUrl))
{
// Проверяем, что не посещали ссылку
if (visitedLinks.Any(link => link == currentUrl)) continue;
Navigate(currentUrl);
// Извлечение информации
var scrapedObjects = ScrapeObjects(currentUrl, otherParams);
foreach (var scrapedObject in scrapedObjects)
{
if (scrapedObject is IUrl url) unvisitedLinks.Enqueue(url);
if (scrapedObject is IScrapingResult item) yield return item;
}
// Добавляем ссылку в посещенные
visitedLinks.Add(currentUrl);
// Делаем паузу
await Task.Delay(delay);
}
BlockingCollection, пример по ссылке в синем блоке в самом верху. Дополнительная очередь здесь не нужна, совсем. – aepot Dec 27 '22 at 12:57BlockingCollectionрешает задачу "усыпления" потоков, которые ждут данных, поясните только, какой должен быть код, чтобы завершить все потоки, работающие с ней. В произвольном потоке я не могу вызвать CompleteAdding, т.к. он не знает, положат ли в коллекцию что-то другие потоки. – mainefremov Dec 28 '22 at 11:05BlockingCollection.CompleteAddingв N+1 потоке, но только в том случае, если все N потоков-обработчиков находятся в состоянии блокировки? – mainefremov Dec 28 '22 at 12:21.CompleteAdding()и все циклы завершаются. Вызвать для отдельного потока не получится, только для всех сразу. CompleteAdding сообщает всем, что данных больше не будет, происходит нормальное завершение циклаforeach. Вызов можно делать из какого угодно потока, как и добавлять элементы,BlockingCollectionполностью потокобезопасна. – aepot Dec 28 '22 at 13:42ThreadState.WaitSleepJoin, а в коллекции не будет элементов, то он будет вызывать.CompleteAdding()? – mainefremov Dec 28 '22 at 14:57CompleteAdding, все воркеры завершатся. Не вижу никакого смысла перезапускать воркеры когда добавление не окончено, пусть висят себе, ждут. А так, вы знаете, как запустить всё с нуля, как полностью все остановить, что мешает использовать эти приёмы когда нужно? ВызовCompleteAddingне останавливает ничего сразу, завершение произойдет именно тогда, когда всё будет обработано полностью, и коллекция будет пустая. Протестируйте. – aepot Dec 28 '22 at 15:03BlockingCollection bc, пройти по ссылке браузером и собрать новые ссылки. Свежесобранные ссылки добавляются вbc. Ни один из потоков не знает, закончили ли другие своё добавление, т.е. ни один не имеет права вызвать.CompleteAdding. В итоге все рано или поздно повиснут вforeach (var url in bc.GetConsumingEnumerable()) {/* */}. А гарантия, что все ссылки добавлены возникает как раз только в тот момент, когдаbc.Count == 0и ни один из потоков "не гуляет по вебу" в поисках ссылок. – mainefremov Dec 28 '22 at 15:22WaitHanle.WaitAll, в комментарий, к сожалению, не влезает, а вопрос закрыт как дубль. Разместил тут. – mainefremov Dec 30 '22 at 00:09Task, и комбинаторы для ожидания. А вы голые потоки теребите.BlockingCollection, это тоже как бы устаревший инструмент. – aepot Dec 30 '22 at 00:31