4

У меня есть 6 миллионов небольших (средний размер около 15 байт) файлов, которые мне нужно прочитать и далее обработать с помощью процессора. Я ранее реализовал это с помощью Task.Factory и всё работало без проблем на asp .net core 2.1. Занимало по времени около 20 часов.

Теперь я перенес приложение на asp.net 6 и на тестовом сервере мое веб-приложение перестает отвечать на любые запросы после запуска этих файловых операций и аварийно завершается. В логах я вижу ошибку System.OutOfMemoryException.

Я полагаю, что мой способ реализации далек от идеала. Я хотел бы узнать какие-нибудь иные способы многопоточной реализации этой работы или ваши замечания по текущему коду.

Метод контроллера ImportSignatures:

[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
    try
    {
        ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
        return Json(res);
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

Метод ImportSigningCerts:

public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Запуск SignatureImportService");
    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    totalSignatures = 0;
    processedSignatures = 0;
var cancelMsg = &quot;Импорт сертификатов был прерван. \n&quot;;
var endMsg = &quot;Импорт сертификатов успешно завершён. \n&quot;;
var toDelete = new List&lt;string&gt;();

try
{
    var configuration = SignatureImportConfiguration.FromCfg();

    using (s_tokenSource = new CancellationTokenSource())
    {
        IEnumerable&lt;string&gt; signatures = Directory.EnumerateFiles(configuration.Path, &quot;*.sig&quot;);
        totalSignatures = signatures.Count();

        Store mainStore = StoreMan.GetStore(&quot;Main&quot;);
        var importStats = new ImportStats();
        List&lt;Task&gt; tasks = new();

        int saveIndex = 1;
        const int proccessedForSave = 100000; // Через какое кол-во обработанных подписей произвести промежуточное сохранение хранилища и удаление подписей
        CancellationToken token = s_tokenSource.Token;

        ThreadPool.GetMinThreads(out int minWorkerThreads, out _);
        using SemaphoreSlim semaphore = new(minWorkerThreads);

        foreach (string path in signatures)
        {
            semaphore.Wait();

            if (WasCancelled)
                break;

            tasks.Add(Task.Factory.StartNew(() =&gt;
            {
                try
                {
                    token.ThrowIfCancellationRequested();

                    if (UploadSigningCerts(mainStore, path, importStats))
                    {
                        if (configuration.NeedCleaning)
                        {
                            lock (s_toDeleteListLockObj)
                                toDelete.Add(path);
                        }
                    }

                    Interlocked.Increment(ref processedSignatures);
                    lock (s_intermediateSaveLockObj)
                    {
                        if (processedSignatures &gt; proccessedForSave * saveIndex)
                        {
                            LogsHelper.WriteEventLog(&quot;Промежуточное сохранение хранилища сертификатов...&quot;);

                            mainStore.WriteIfChanged();
                            StartRemovingSignatures(toDelete);
                            saveIndex++;
                        }
                    }
                }
                catch (Exception e)
                {
                    if (e is not OperationCanceledException)
                        LogsHelper.WriteLog(&quot;SignatureImportService/ImportSigningCerts:Task.Factory.StartNew&quot;, e);
                }
                finally
                {
                    semaphore.Release();
                }
            }, token));
        }

        try
        {
            await Task.WhenAll(tasks);
        }
        catch (OperationCanceledException) { }

        mainStore.WriteIfChanged();
        StartRemovingSignatures(toDelete);
        ResultStr = (WasCancelled ? cancelMsg : endMsg) + $&quot;Certificates found: {importStats.all}. Was imported: {importStats.imported}.&quot; + (importStats.parsingFailed &gt; 0 ? $&quot; Unrecognized files: {importStats.parsingFailed}&quot; : &quot;&quot;);
    }

    LogsHelper.WriteEventLog(ResultStr);
    return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
finally
{
    IsWorking = false;
}

}

Метод UploadSigningCerts:

private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    bool toBeDeleted = true;
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
try
{
    List&lt;CertInfo&gt; certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();

    Interlocked.Add(ref importStats.all, certs.Count);

    for (int i = 0; i &lt; certs.Count; i++)
    {
        lock (s_importLockObj)
        {
            // Код по валидации каждого сертификата из файла, принятие решения об импорте, импорт в хранилище...
        }
    }
    return toBeDeleted;
}
catch (Exception e)
{
    LogsHelper.WriteLog(&quot;SignatureImportService/UploadSigningCerts&quot;, e);
    LogsHelper.WriteEventLog($&quot;Ошибка импорта сертификата из подписи: {Path.GetFileName(path)};&quot;);
    Interlocked.Increment(ref importStats.errors);
    return false;
}

}

Метод StartRemovingSignatures:

private static void StartRemovingSignatures(List<string> toDelete)
{
    if (toDelete.Count > 0)
    {
        List<string> tempToDelete;
        lock (s_toDeleteListLockObj)
        {
            tempToDelete = new List<string>(toDelete);
            toDelete.Clear();
        }
    LogsHelper.WriteEventLog(&quot;Удаление успешно обработанных файлов подписей...&quot;);

    Task.Factory.StartNew(() =&gt;
    {
        tempToDelete.ForEach(path =&gt;
        {
            try
            {
                File.Delete(path);
            }
            catch (Exception e)
            {
                LogsHelper.WriteLog(&quot;ImportResult/DeleteSignatures&quot;, e);
            }
        });
    });
}

}

Текст ошибки:

20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
   at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
   at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
   at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Store.Services.SignatureImportService.ImportSigningCerts()
   at Store.Controllers.SettingsController.ImportSignatures()

Updated: код был отредактирован

Dany
  • 43
  • 3
    6 млн файлов в одной папке - это ахтунг для большинства файловых систем. | Открытие/закрытие файла - это медленная операция. 6 млн открытий/закрытий - это очень долго. Глобальное решение проблемы: хранить эти подписи в небольшом количестве файлов. Или в какой-нибудь БД, например, в sqlite. – Alexander Petrov Aug 23 '23 at 14:37
  • 1
    На веб-серверах не рекомендуется использовать многопоточность/параллельность. Все потоки заняты обработкой этих файлов - нет свободных потоков для обработки запросов. Вот сервер и не отвечает. | Нужна асинхронность! Хотелось бы увидеть метод UploadSigningCerts. Можно не весь код, а только тот, что использует файл. – Alexander Petrov Aug 23 '23 at 14:41
  • @AlexanderPetrov, к сожалению, вопрос хранения подписей невозможно изменить... Имеем 6 млн файлов... Код метода UploadSigningCerts добавил. – Dany Aug 23 '23 at 19:50
  • Почему невозможно? Можно считать подпись и как binary закинуть в базу, например. К комментариям выше присоединяюсь, 6млн файлов это что-то слишком уже... – OwDafuq Aug 24 '23 at 08:46
  • Мой код: - а метод, в котором он находится не показали, что Это за метод, синхронный он, асинхронный, контроллер или что? вы просто уложили сервер таким кодом, вот пример https://ru.stackoverflow.com/q/1303748/373567, надо ограничивать использование ресурсов. – aepot Aug 24 '23 at 12:23
  • 1
    @Deadooshka нельзя использовать GC.Collect, и тем более другми это советовать, и тем более на серверах. GC при недостатке памяти и так всё агрессивно соберет. Ваш совет не только бесполезен на 100% но и вреден, может добавить тормозов к и без того нерабочему коду, а проблему не решит. – aepot Aug 24 '23 at 12:27
  • @aepot данный код - синхронный метод контроллера. – Dany Aug 24 '23 at 12:58
  • Добавьте сигнатуру метода в вопрос, то есть метод целиком, я могу сделать его асинхронным, чтобы не вешать сервер, но я не вижу самого метода, чтобы решить эту проблему. – aepot Aug 24 '23 at 13:12
  • Вот ещё что скажу. 6 млн файлов по 15 байт - это меньше ста мегабайт. Но! Каждый файл будет занимать минимум один кластер в файловой системе. По умолчанию кластер равен 4 кб. Итого 6 млн файлов займут 24 Гб! Меняйте способ хранения. Меняйте! – Alexander Petrov Aug 25 '23 at 00:26
  • @aepot, сделал вызов метода в контроллере и последующий вызов ImportSigningCerts асинхронными, немного рефакторинга. Ознакомьтесь, пожалуйста. С данным кодом всё стало работать раза в 2 быстрее... Под конец (после 4.5 млн обработанных файлов) опять словил ошибку по памяти - я думаю, это из-за коллекции Task-ов, которые у меня копятся для отслеживания в конце их финиша. Быть может, в моей ситуации стоит посмотреть в какую-то другую сторону, не использовать Task-и? – Dany Aug 25 '23 at 16:52
  • Да, стало лучше, но здесь много ещё чего можно улучшить. Я напишу ответ как освобожусь. – aepot Aug 25 '23 at 17:31
  • @aepot, читал ещё посты другие на stackoverflow, наткнулся на схожую проблему с памятью при работе с тасками. Там предложили разделить общее кол-во на "порции". У меня вот в коде есть как раз промежуточное сохранение через каждые 100тыс файлов - думаю, можно сделать ожидание завершения задач на этом этапе и затем приступать к обработке очередной "порции". Таким образом массив будет содержать максимально 100тыс задач вместо 6.5 млн. Что думаете о таком подходе? – Dany Aug 25 '23 at 18:00
  • А зачем вообще параллелизм. Разве диск по своей природе не синхронный? Может какие ssd-диски и позволяют аппаратно асинхронность, но в обычных дисках головка записи не может распараллелиться. – Deadooshka Aug 25 '23 at 18:12
  • @Deadooshka по мат части вам ничего не могу ответить, но опытным путём проверено, что в моём случае разделение на потоки даёт буст в 2 с лишним раза по скорости обработки всех файлов. Согласитесь - есть разница между 8ч и 20ч работы... Да и все эти операции включают в себя не только работу с файлами, но и процессорную часть... – Dany Aug 25 '23 at 18:41

1 Answers1

2

Давайте попробуем этот код стабилизировать

  • нужно избавиться от лишних делегатов
  • нет смысла гонять файловую систему из кучи потоков, она синхронная, она не может в рамках одного носителя выполнять кучу операций одновременно
  • нет смысла трогать настройки пула потоков, он нормально настроен, просто нужно относиться к нему бережно
  • нельзя блокировать поток контроллера, так вы вешаете весь сервер, для этого есть асинхронные операции
  • много локов - плохо для понимания кода и производительности, используйте потокобезопасные коллекции
  • процедура чтения int переменной атомарна сама по себе, поэтому lock не нужен для чтения, избавиться от лишних локов
  • полностью избавиться от статики
  • ограничить количество одновременно работающих задач до количества ядер сервера, умноженное на два (так исторически сложилось, что это оптимально, с этого надо начинать)
  • каждый созданный вами Task должен быть ожидаемым, так вы получаете контроль над процессом и удобнее ловить исключения
  • удаление файлов можно вообще через Producer/Consumer сделать, чтобы удалялись сразу как появляются в череди на удаление, при этом не тормозили основной код.

Вот и получается как-то так

[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
    SignatureImportService importService = new SignatureImportService();
    try
    {
        ImportSigningCertsResult res = await importService.ImportSigningCerts();
        return Json(res);
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, importService.WasCancelled));
    }
}
public async Task<ImportSigningCertsResult> ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Запуск SignatureImportService");
const string cancelMsg = &quot;Импорт сертификатов был прерван. \n&quot;;
const string endMsg = &quot;Импорт сертификатов успешно завершён. \n&quot;;
try
{
    using (_cts = new CancellationTokenSource())
    {
        var configuration = SignatureImportConfiguration.FromCfg();
        string[] signatures = Directory.GetFiles(configuration.Path, &quot;*.sig&quot;);
        totalSignatures = signatures.Length;
        processedSignatures = 0;

        Store mainStore = StoreMan.GetStore(&quot;Main&quot;);
        var importStats = new ImportStats();
        List&lt;Task&gt; tasks = new();

        int saveIndex = 1;
        const int proccessedForSave = 100000; // Через какое кол-во обработанных подписей произвести промежуточное сохранение хранилища и удаление подписей
        CancellationToken token = _cts.Token;
        Channel&lt;string&gt; channel = Channel.CreateUnbounded&lt;string&gt;();
        ChannelWriter&lt;string&gt; deleteQueue = channel.Writer;
        Task deleteTask = DeleteWorkerAsync(channel.Reader, token);
        object saveSync = new object();
        int maxJobs = Environment.ProcessorCount * 2;
        using SemaphoreSlim semaphore = new(maxJobs);
        try
        {
            foreach (string path in signatures)
            {
                await semaphore.WaitAsync(token);
                if (tasks.Count &gt; maxJobs * 2)
                {
                    for (int i = 0; i &lt; tasks.Count; i++) // не позволит списку тасков расти бесконечно
                    {
                        if (tasks[i].IsCompletedSuccessfully)
                        {
                            tasks.RemoveAt(i--);
                        }
                    }
                }
                tasks.Add(Task.Run(() =&gt;
                {
                    try
                    {
                        token.ThrowIfCancellationRequested();

                        if (UploadSigningCerts(mainStore, path, importStats) &amp;&amp; configuration.NeedCleaning)
                        {
                            deleteQueue.TryWrite(path);
                        }
                        bool needSave = false;
                        lock (saveSync)
                        {
                            if (++processedSignatures &gt; proccessedForSave * saveIndex)
                            {
                                saveIndex++;
                                needSave = true;
                            }
                        }
                        if (needSave)
                        {
                            LogsHelper.WriteEventLog(&quot;Промежуточное сохранение хранилища сертификатов...&quot;);
                            mainStore.WriteIfChanged();
                        }
                    }
                    catch (Exception e) when (e is not OperationCanceledException)
                    {
                        LogsHelper.WriteLog(&quot;SignatureImportService/ImportSigningCerts:Task.Factory.StartNew&quot;, e);
                    }
                    finally
                    {
                        semaphore.Release();
                    }
                }, token));
            }

            await Task.WhenAll(tasks);
            mainStore.WriteIfChanged();
            deleteQueue.Complete();
            await deleteTask;
        }
        catch (OperationCanceledException) { }

        string result = (token.IsCancellationRequested ? cancelMsg : endMsg) + $&quot;Certificates found: {importStats.all}. Was imported: {importStats.imported}.&quot; + (importStats.parsingFailed &gt; 0 ? $&quot; Unrecognized files: {importStats.parsingFailed}&quot; : &quot;&quot;);
        LogsHelper.WriteEventLog(result);

        return new ImportSigningCertsResult(result, token.IsCancellationRequested);
    }
}
finally
{
    _cts = null;
}

}

private bool UploadSigningCerts(Store store, string path, ImportStats importStats) { bool toBeDeleted = true; CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

try
{
    CertInfo[] certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToArray(); // здесь непонятно, что такое за коллекция Certs и зачем ее клонировать

    Interlocked.Add(ref importStats.all, certs.Length);

    for (int i = 0; i &lt; certs.Length; i++)
    {

    }
    return toBeDeleted;
}
catch (Exception e)
{
    LogsHelper.WriteLog(&quot;SignatureImportService/UploadSigningCerts&quot;, e);
    LogsHelper.WriteEventLog($&quot;Ошибка импорта сертификата из подписи: {Path.GetFileName(path)};&quot;);
    Interlocked.Increment(ref importStats.errors);
    return false;
}

}

private async Task DeleteWorkerAsync(ChannelReader<string> reader, CancellationToken token) { await foreach (string path in reader.ReadAllAsync(token)) { try { File.Delete(path); // если файловая система медленная, здесь можно await Task.Run(() => File.Delete(path)); } catch (Exception e) { LogsHelper.WriteLog("ImportResult/DeleteSignatures", e); } } }

bool переменные тоже бессмысленные, у вас есть потокобезопасный CTS, его и используйте, например так

public bool WasCancelled => _cts?.IsCancellationRequested ?? false;
public bool IsWorking => !_cts?.IsCancellationRequested ?? false;

private CancellationTokenSource _cts;

Избавляйтесь от статики дальше. И отвыкайте от списков в пользу массивов, если не надо динамически добавлять и удалять элементы из списка. Массивы легче и работают быстрее.

Вот еще почитайте в тему: c# как ускорить добавление потоков используя async-await

aepot
  • 49,560
  • 1
    В конструкции if (processedSignatures > proccessedForSave * saveIndex) без лока разве не может получиться так, что несколько потоков "залезут" в этот условный блок и получим множественное сохранение и инкремент индекса saveIndex? – Dany Aug 25 '23 at 19:32
  • 1
    @Dany поставил инкремент повыше. Так как потоков не много вероятность критически мала, но имеется, вы правы. С локом хуже получится. +Добавил проверочку в начало метода. – aepot Aug 25 '23 at 19:48
  • @Dany ок, уговорили, переписал на полностью потокобезопасный код, но само сохранение все равно за пределами лока. – aepot Aug 25 '23 at 20:03
  • Вы написали, что следует избавиться от статики - в чём причина? Если мне нужно получать какую-то информацию о данном классе в разных методах контроллера, статистику, то лучше реализовать класс как singleton, чем делать необходимые мне поля статическими? – Dany Aug 26 '23 at 15:59
  • @Dany вы пишете сервер, так? Сервер должен уметь обслуживать сколько угодно запросов одновременно, так? О какой статике может быть речь? Не закладывайте в архитектуру ни статику, не синглтоны. Как я понял, ответ не помог? – aepot Aug 26 '23 at 21:59
  • 1
    Нет, всё отлично. Спасибо) – Dany Aug 28 '23 at 16:48