1

Имеется .NET 7 (WPF) приложение, в котором пользователь может скачивать некоторые файлы. Мой план заключается в том, чтобы сначала собрать задачи скачивания в список (чанк) и уже потом сразу весь чанк задач выполнить. Но, видимо, я упускаю что-то очень важное, ибо скачивание файла начинается сразу же после создания задачи.

Метод скачивания, пробовал и цепочку и async/await:

public Task DownloadAsync(PostModel post, string saveDirectory, CancellationToken cancellationToken)
{
    var filePath = Path.Combine(saveDirectory, $"{post.Id}{Path.GetExtension(post.FileUrl)}");
    if (!File.Exists(filePath))
    {
        return _httpClient.GetAsync(post.FileUrl, cancellationToken)
            .ContinueWith(x => x.Result.Content.ReadAsByteArrayAsync(cancellationToken)
                .ContinueWith(y => File.WriteAllBytesAsync(filePath, y.Result, cancellationToken), cancellationToken), cancellationToken);
    //using var response = await _httpClient.GetAsync(post.FileUrl, cancellationToken);
    //var responseByteArray = await response.Content.ReadAsByteArrayAsync(cancellationToken);
    //await File.WriteAllBytesAsync(filePath, responseByteArray, cancellationToken);
}

return Task.FromResult(0);

}

Цикл, где используются скачивание (упростил для размещения):

var chunkDownloadTasks = new List<Task>();
foreach (var post in posts.Posts)
{
    var downloadTask = (GetContentTypeFromUrl(post.FileUrl) switch
    {
        ContentType.Image when settings.IncludeImages => _downloadService.DownloadAsync(post, GetFolder(settings.SavePath, ref imagesFolder, "Images"), cancellationToken),
        ContentType.Video when settings.IncludeVideo => _downloadService.DownloadAsync(post, GetFolder(settings.SavePath, ref videoFolder, "Video"), cancellationToken),
        ContentType.Gif when settings.IncludeGifs => _downloadService.DownloadAsync(post, GetFolder(settings.SavePath, ref gifFolder, "Gif"), cancellationToken),
        _ => null
    })?.ContinueWith(_ =>
    {
        status.Downloaded++;
        progress.Report(status);
    }, cancellationToken);
if (downloadTask == null)
{
    status.Skipped++;
    progress.Report(status);

    continue;
}

chunkDownloadTasks.Add(downloadTask);

if (chunkDownloadTasks.Count == ChunkSize)
{
    await Task.WhenAll(chunkDownloadTasks.ToArray());
    chunkDownloadTasks.Clear();
}

}

Jagailo
  • 2,081
  • 19
  • 32

1 Answers1

2

Так а суть проблемы то в чем, медленно качается, или что? Зачем эти все истории с чанками и т.д.? По коду - вы смешиваете async/await и TPL .ContinueWith, это затрудняет написание стабильного решения и усложняет отладку. Метод скачивания почему-то у вас всасывает все данные в память, зачем?

Далее, можно организовать Producer/Consumer, например вы хотите, чтобы одновременно качалось не более 8 файлов.

Кстати, покажу пример настройки _httpClient.

private static readonly HttpClient _httpClient = new(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.All })
{ 
    DefaultRequestVersion = HttpVersion.Version20
};

Создаю consumer

private async Task PostDownloaderAsync(string saveDirectory, CancellationToken cancellationToken, ChannelReader<PostModel> reader)
{
    const int maxConcurrency = 8;
    using SemaphoreSlim semaphore = new(maxConcurrency);
    ConcurrentBag<Task> tasks = new();
    await foreach (PostModel post in reader.ReadAllAsync(cancellationToken))
    {
        await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        Task task = DownloadAsync(post, saveDirectory, cancellationToken, semaphore, tasks);
        tasks.Add(task);
        _ = task.ContinueWith(t => tasks.TryTake(t));
    }
    await Task.WhenAll(tasks).ConfigureAwait(false);
}

private async Task DownloadAsync(PostModel post, string saveDirectory, CancellationToken cancellationToken, SemaphoreSlim semaphore) { try { var filePath = Path.Combine(saveDirectory, $"{post.Id}{Path.GetExtension(post.FileUrl)}"); using var response = await _httpClient.GetAsync(post.FileUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); using var stream = await response .EnsureSuccessStatusCode() // бросить исключение если сервер ответил ошибкой .Content .ReadAsStreamAsync(cancellationToken) .ConfigureAwait(false); using var fs = File.Create(filePath); await stream.CopyToAsync(fs, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { Debug.WriteLine(ex.Message); } finally { semaphore.Release(); } }

Кстати, здесь есть пример метода загрузки с отчётом о прогрессе и возможностью докачки недокачанного файла.

Далее, создаю канал передачи данных между генератором операций скачивания (producer) и исполнителем (consumer) и запускаю исполнителя

Channel<PostModel> channel = Channel.CreateUnbounded<PostModel>();
Task consumerTask = PostDownloaderAsync(saveDirectory, cancellationToken, channel.Reader);

Чтобы запусть операцию, нужно вызвать

await channel.Writer.WriteAsync(post, cancellationToken);

Чтобы остановить consumer, нужно вызвать

channel.Writer.Complete();
await consumerTask;

Ну или отменить CancellationTokenSource.

Вот и получится, что как только выполняете WriteAsync, сразу же начинается загрузка, но ждать её окончания не придётся. Если же вы закинете в очередь очень много загрузок сразу, то максимум одновременно будет качаться 8 штук, и как только очередная загрузка завершится, сразу же начнётся следующая, и так пока всё не скачается.

Вы можете вызвать Complete до завершения всех загрузок в любое время, хоть сразу после добавления всех задач, consumerTask завершится как всё докачается. Complete не прерывает процесс загрузок, он только сообщает каналу, что добавление в очередь завершено, чтобы цикл await foreach нормально завершился как только канал опустеет.

aepot
  • 49,560
  • А зачем тут хитрый приём с удалением таски из коллекции по завершении? Память экономите? ) – CrazyElf Oct 14 '23 at 10:19
  • @CrazyElf никто не сказал, что задач будет ограниченное количество. Можно конечно выбросить эту строчку, если их не много. Вдруг это какой-то парсер, который качает файлы неделями. – aepot Oct 14 '23 at 10:32
  • Не, так то приём интересный, но ситуация, когда таска сама себя из коллекции удаляет не кажется мне методологически правильной. Как-то это по-другому должно делаться, мне кажется. Объекты более высокого уровня должны управлять объектами ниже по иерархии, а не наоборот. Но возможно, я ошибаюсь. Не может, например, быть такого, что ContinueWith отработает уже после WhenAll и выхода из функции? Да, вероятность этого мала и ещё меньше вероятность того, что локальная коллекция tasks успеет утилизироваться. Но такая вероятность, мне кажется, есть и в результате будет что-то нехорошее. – CrazyElf Oct 14 '23 at 10:49
  • @CrazyElf ну ок, можно написать обертку, которая стартует таску, эвэйтит и удаляет, в итоге получится то же самое, только с лишней прослойкой. Методологически задача внутри метода скачивания ничего не знает ни про коллекци, не про то что надо себя откуда-то удалить. – aepot Oct 14 '23 at 13:32
  • а суть проблемы то в чем, медленно качается, или что? - Так это и не суть, но чанками качать маленькие файлы быстрее. > Метод скачивания почему-то у вас всасывает все данные в память, зачем? - Знаю, это пока что playground, просто играюсь с разными вещами. Я не эксперт, чтобы сразу идеальный код выдать, решаю проблемы по одной. Но спасибо за ваш ответ, буду изучать

    – Jagailo Oct 14 '23 at 15:37
  • А что за глобальный семафор (_semaphore) или это опечатка? – Jagailo Oct 14 '23 at 17:09
  • 1
    @Jagailo опечатка, поправил. Код на коленке писал. – aepot Oct 14 '23 at 18:16