0

Всем здравствуйте, есть у меня задача по архивации большого файла, при этом я его разбиваю на части, части складываю в очередь, далее обрабатываю каждую часть отдельным потоком, кладу в новую очередь и от туда уже пишу в файл, и тут два вопроса, подскажите как сделать проверку полноты очереди с блоками и ограничение этой очереди определенным количеством блоков.По логике я понимаю, что когда очередь полная, то поток должен ждать пока очередь опустеет до одного элемента, чтобы совсем не прерывать процесс.Просто сейчас у меня используется полностью процессор и полностью память, но когда она вся используется это не совсем верно Вот сам код пока в таком исполнении. Все сжимает.

 using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.IO.Compression;
    using System.Threading;

    namespace ConsoleApplication57
    {
        class Program
        {
            static void Main(string[] args)
            {

     string path = @"d:\4.stl";
     string path2 = @"d:\Black Widow111.m4a";
     string path_compres = @"d:\Compress.gz";

                // определяю количество потоков, для обработки блоков использую количество ядер*2 и два потока на чтение данных по блокам и помещение их в очередь и финальную запись данных в файл
                int core = Environment.ProcessorCount;
                int thredCount = core * 2+2;

            //ограничение элементов в очереди
                int queue_max_count = core * 2;

                PCQueue q = new PCQueue(thredCount);


    // создаем очередь c блоками ланных
            Queue<KeyValuePair<int, byte[]>> queue_block = new Queue<KeyValuePair<int, byte[]>>();
    // создаем очередь с готовыми обработанными блоками
            Queue<KeyValuePair<int, byte[]>> readyQueue = new Queue<KeyValuePair<int, byte[]>>();


    // открываем поток
                using (var fs = new FileStream(path, FileMode.Open))


    // добавляем в очередь считанные блоки  

         ADD_Block_to_Queue(queue_block,fs);


                // сжимаем/разжимаем и добавляем в готовую очередь в разных потоках
                Console.WriteLine("Добавили в очередь задачи");

                for (int i = 0; i < thredCount; i++)
                {
                    int itemNumber = i;     
                    q.EnqueueItem(() =>
                    {
                        // сжимаем/разжимаем и добавляем в готовую очередь  
                        Add_obrabotanniy_block_to_readyQueue(queue_block, readyQueue);
                        Console.Write(" Задача " + itemNumber);
                    });
                }

                q.Shutdown(true);

 // пишем в файл блоки

            IOrderedEnumerable<KeyValuePair<int, byte[]>> ordered = readyQueue.OrderBy(e => e.Key);

                foreach (var kv in ordered)
                {
                  Write_Final_File(path_compres,readyQueue.Dequeue().Value);
                }
                Console.WriteLine();
                Console.WriteLine("Workers complete!");    






            }


            // добавляем в очередь считанные блоки  
            public static void ADD_Block_to_Queue(Queue<KeyValuePair<int,byte[]>> queue_block,Stream fs)
            {
                foreach (KeyValuePair<int, byte[]> block in Read_Blockk(fs))
                {
                    queue_block.Enqueue(block);

                }

            }


            // добавляем в  очередm для готовых обработанных блоков
            public static void Add_obrabotanniy_block_to_readyQueue(Queue<KeyValuePair<int, byte[]>> queue_block, Queue<KeyValuePair<int, byte[]>> readyQueue)
            {
                while (queue_block.Count > 0)
                {
                    var block = queue_block.Dequeue();
                    var compressionBlock = COmpress(block.Key, block.Value);
                    readyQueue.Enqueue(compressionBlock);

                }

            }

            // читаем данные из файла блоками         
            public static IEnumerable<KeyValuePair<int,byte[]>> Read_Blockk(Stream stream)
            {
                const int size_block=1024 * 1024; // определяем размер буфера=1мб

                int index = 0; // номер блока
                while (stream.Position<stream.Length)
                {
                    // выделяем память под массив буффера.
                    byte[] buffer=new byte[System.Math.Min(size_block,stream.Length-stream.Position)];

                    stream.Read(buffer, 0, buffer.Length);
                    yield return new KeyValuePair<int, byte[]>(index++,buffer);
                }

            }


            //сжимаем данные
            public static KeyValuePair<int, byte[]> COmpress(int index,byte[] block)
            {
                using (var ms = new MemoryStream())
                using (var gzStream=new GZipStream(ms,CompressionMode.Compress))
                {

                    int ind = index;
                    gzStream.Write(block,0,block.Length);
                    gzStream.Close();

                     return new KeyValuePair<int, byte[]>(ind++,ms.ToArray());


                    }

                }


            // разжимаем данные
            public static KeyValuePair<int, byte[]> DEcompres(int index,byte[] block)
            {

                using (var ms=new MemoryStream())
                using (var gzstream=new GZipStream(ms,CompressionMode.Decompress))
                {
                    int ind = index;

                    gzstream.Read(block, 0, block.Length);
                    gzstream.Close();
                    return new KeyValuePair<int, byte[]>(ind++,ms.ToArray());




                }




            }


            // пишем даныне в файл
            public static void Write_Final_File(string path, byte[] ReadyBlock)
            {
                using (var fsWrite = new FileStream(path, FileMode.Append, FileAccess.Write))

                fsWrite.Write(ReadyBlock,0,ReadyBlock.Length);


            }


            }



        }


Класс организации потоков.

    using System;
    using System.Threading;
    using System.Collections.Generic;

    public class PCQueue
    {
    readonly object _locker = new object();
    Thread[] _workers;
    Queue<Action> _itemQ = new Queue<Action>();

    public PCQueue(int workerCount)
    {
    _workers = new Thread[workerCount];

    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
    (_workers[i] = new Thread(Consume)).Start();
    }

    public void Shutdown(bool waitForWorkers)
    {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
    EnqueueItem(null);

    // Wait for workers to finish
    if (waitForWorkers)
    foreach (Thread worker in _workers)
    worker.Join();
    }

    public void EnqueueItem(Action item)
    {
    lock (_locker)
    {
    _itemQ.Enqueue(item);           // We must pulse because we're
    Monitor.Pulse(_locker);         // changing a blocking condition.
    }
    }

    void Consume()
    {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
    Action item;
    lock (_locker)
    {
    while (_itemQ.Count == 0) Monitor.Wait(_locker);
    item = _itemQ.Dequeue();
    }
    if (item == null) return;         // This signals our exit.
    item();                           // Execute item.
    }
    }
    }
  • Это читали: https://ru.stackoverflow.com/q/428327/218063 ? – Андрей NOP Jul 30 '18 at 09:39
  • @АндрейNOP читал вот это http://www.albahari.com/threading/part4.aspx#_Wait_Pulse_Producer_Consumer_Queue – Vladimr Vladimirovoch Jul 30 '18 at 09:42
  • Возможный дубликат вопроса: Имплементация Producer/Consumer pattern – Grundy Jul 30 '18 at 09:47
  • @Grundy Это не дубликат. речь про другое, но спасибо за ответ – Vladimr Vladimirovoch Jul 30 '18 at 10:06
  • Читающий поток, закончив чтение файла и наполнение очереди, вполне может завершить свою работу. А (рас)пакующие потоки в это время ещё будут работать. Аналогично, когда обрабатывающие потоки завершат свою работу, заполнив очередь на запись, они могут завершиться, а записывающий поток ещё будет работать. Поэтому эти два потока должны управляться отдельно от других, имхо. – Alexander Petrov Jul 30 '18 at 10:56
  • Лучше бы вы взяли готовый пример паттерна, о котором уже знаете. Там и правильное создание потоков/задач, там и ограничение размера очередей, и грамотное завершение... Хотя, конечно, написать свой класс организации потоков полезно для получения опыта. / Ограничить размер очереди: перед помешением в неё очередного блока, проверяете её размер - если он достиг определённого размера, ждёте, пока один из обрабатывающих потоков не выгребет блок. – Alexander Petrov Jul 30 '18 at 11:01
  • @AlexanderPetrov Спасибо за ответ, но я вообще то склоняюсь к той мысли. что да два потока работают вообще отдельно, С ограничением очереди сейчас попробую разобраться. – Vladimr Vladimirovoch Jul 30 '18 at 11:05
  • при распаковке ..., как мне использовать номер блока - о, тут вы попали... Самый простой способ: прочитать все сжатые блоки в какую-либо коллекцию (тут может памяти не хватить) и отсортировать по номеру блока. Потом разжимать последовательно (одним потоком - забываем о многопоточности). А так, нужно вводить счетчик текущего записанного номера блока (изменять его атомарно!) и в каждом потоке следить за ним. – Alexander Petrov Jul 30 '18 at 11:08
  • @AlexanderPetrov да я столкнулся с тем, что блоки сжались и записались в очередь для готовых блоков не по порядку , это я увидел чрез отладчик, далее, от того у меня при распаковке даже mp3 не играл, что и натолкнуло меня на мысль о неверном порядке,и тут я взял и просто отсортировал // пишем в файл блоки
            IOrderedEnumerable<KeyValuePair<int, byte[]>> ordered = readyQueue.OrderBy(e => e.Key);
    
                foreach (var kv in ordered)
                {
                  Write_Final_File(path_compres,readyQueue.Dequeue().Value);
                }
    
    – Vladimr Vladimirovoch Jul 30 '18 at 11:13
  • А как реализовать ваш пример,про отомарность, Я сортирую уже готовую коллекцию блоков, и потом уже потом по прядку ее пишу,но что то мне подсказывает это не рационально – Vladimr Vladimirovoch Jul 30 '18 at 11:16
  • @AlexanderPetrov подскажите пожалуйста с правильной проверкой очереди на переполнение, если вам несложно – Vladimr Vladimirovoch Jul 30 '18 at 13:18
  • @AlexanderPetrov Здравствуйте, спасибо вам за ответы, просьба. подскажите пожалуйста про запись индекса, и как лучше использовать номер блока. Заранее спасибо. – Vladimr Vladimirovoch Jul 31 '18 at 08:44

0 Answers0