0

Есть метод, этот метод на вход получает массив данных определенного размера, сжимает его и возвращает массив уже сжатых данных. Вот код:

 public byte[] Compres(byte[] dataBytes)// метод для сжатия
        {
            using (var CompresStream=new MemoryStream())// создаем поток мемористрим
            {
                using (var gzStream=new GZipStream(CompresStream,CompressionMode.Compress))
                {
                    gzStream.Write(dataBytes,0,dataBytes.Length);
                    gzStream.Close();
                    return CompresStream.ToArray();
                }
            }
        }

Мне нужно, чтобы на выходе из метода была структура данных такая [номер блока=4байта][размер сжатых данных=4 байта][сами сжатые данные] Подскажите пожалуйста как правильно это сделать, я новичек, умом понимаю как но как реализовать пока не очень хорошо понимаю.

Kromster
  • 13,809
  • Дока по Stream https://msdn.microsoft.com/ru-ru/library/system.io.stream(v=vs.110).aspx Записывайте через Write ваши байты. Понадобится ещё Position. Integer В байты переводит BitConverter – nick_n_a Jul 12 '18 at 06:22
  • @nick_n_a Спасибо за комментарий, в моем случае я могу прямо к CompresStream дописывать байты? Хотелось бы увидеть пример – Vladimr Vladimirovoch Jul 12 '18 at 06:31
  • Да. Так как размер зараннее не извесно - записать 4 ноля, потом Position, Write и Position=end. – nick_n_a Jul 12 '18 at 06:33
  • @nick_n_a такой код не подойдет, попросят переделать – slippyk Jul 12 '18 at 06:54
  • во-первый, в сам метод номер блока не передается - уже структура на выходе не получится, да этого и не нужно. – slippyk Jul 12 '18 at 06:55
  • @slippyk спасибо за комментарий, а тогда как лучше сделать,с учетом того чтобы потом каждую часть отдавать отдельному потоку на сжатие – Vladimr Vladimirovoch Jul 12 '18 at 07:03

2 Answers2

1

Я решал эту задачу так:

  • создавал две очереди, в которых хранил пары значений (номер_блока, байты_данных_этого_блока) Queue<KeyValuePair<int, byte[]>>;
  • в первую очередь я читал файл поблочно, потом брал оттуда блоки для компрессии/декомпрессии и помещах их во вторую очередь;
  • а уже из второй очереди писал их в файл.

Соответственно, один поток читает файл и помещает необработанные блоки в первую очередь,несколько потоков берут из нее блоки для обработки (сжатие/расжатие) и кладут готовые во вторую очередь, и один поток для записи обработанных блоков в файл. Суть ведь тестового задания в многопоточности, если я правильно думаю.

Queue<KeyValuePair<int, byte[]>> - из такой структуры мне было просто получить номер блока, узнать его размер и записать последовательно все данные в файл.

slippyk
  • 6,161
  • Я набросал, что то подобное в самом первом своем варианте, но потом что то свернул его, да в задаче главное это много поточность и то, что нужно каждый блок блок жать и писать отдельно от другого блока. Вопрос , а на каком этапе вы приписывали к блоку именно KeyValuePair ? – Vladimr Vladimirovoch Jul 12 '18 at 07:41
  • при чтении файла блоками – slippyk Jul 12 '18 at 08:00
0

Вот вам законченный пример с записью в выходной поток номера блока и размера сжатого блока. Пишется это с помощью BinaryWriter.

Никакого задела на будущую многопоточность тут не предусмотрено.

using System;
using System.IO;
using System.IO.Compression;

namespace ConApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                using (var inFileStream = new FileStream("input.dat", FileMode.Open))
                using (var outFileStream = new FileStream("output.dat", FileMode.Create))
                using (var binaryWriter = new BinaryWriter(outFileStream))
                {
                    var uncompressed = new byte[100000];
                    int blockNumber = 0;

                    while (true)
                    {
                        int length = inFileStream.Read(uncompressed, 0, uncompressed.Length);
                        if (length == 0)
                            break;

                        var compressed = Compres(uncompressed, length);

                        binaryWriter.Write(blockNumber);
                        binaryWriter.Write(compressed.Length);
                        binaryWriter.Write(compressed);

                        //Console.WriteLine(length + "\t" + blockNumber + "\t" + compressed.Length);
                        blockNumber++;
                    }
                }
            }
            catch (Exception ex) { Console.WriteLine(ex); }
        }

        static byte[] Compres(byte[] dataBytes, int length)
        {
            using (var compresStream = new MemoryStream())
            {
                using (var gzStream = new GZipStream(compresStream, CompressionMode.Compress))
                {
                    gzStream.Write(dataBytes, 0, length);
                }
                return compresStream.ToArray();
            }
        }
    }
}
  • т.к. размер блока для чтения фиксирован, можно расчитать сколько таких блоков будет в файле. эта информация тоже пригодится при декомпрессии – slippyk Jul 12 '18 at 11:28
  • @slippyk Здравствуйте, ну вот собрал я свое приложение, пока без многопоточности,к вам вопрос про ваш алгоритм, а именно создавал две очереди, в которых хранил пары значений (номер_блока, байты_данных_этого_блока) Queue<KeyValuePair<int, byte[]>>; в первую очередь я читал файл поблочно, потом брал оттуда блоки для компрессии/декомпрессии и помещах их во вторую очередь; а уже из второй очереди писал их в файл. Вот вопрос к вам, а как вы брали блоки из очереди с блоками и отдавали потоку , и сколько потоков можно открывать, я же не могу сделать 648 потоков, какая логика, подскажите? – Vladimr Vladimirovoch Jul 26 '18 at 14:02
  • @VladimrVladimirovoch один поток на чтение файла и занесение необработанных блоков в первую очередь. количество ядер процессора * 2 - столько потоков на обработку блоков из первой очереди и помещения их во вторую. И один поток на запись блоков из второй очереди в файл. – slippyk Jul 26 '18 at 15:53
  • @slippyk а очередь как то нужно ограничивать по количеству элементов, ну например она чтобы не переполняла память? А чисто технически для организации потоков, нужно брать какую то конструкцию типа producer-consumer или как то по другому это организуется? Что рациональнее? – Vladimr Vladimirovoch Jul 27 '18 at 06:38
  • @VladimrVladimirovoch очередь ограничивать нужно чтобы не было переполнения. для организации потоков я использовал producer-consumer. – slippyk Jul 27 '18 at 07:01
  • @slippyk да я как раз и думал про ограничение очереди, мы ограничиваем ее количеством элементов, если она полная, поток, который читает по блокам и добавляет в очередь приостанавливаем пока очередь не разгрузится верно? и еще по producer-consumer может подскажите где посмотреть. я много информации нашел, только под разные задачи паттерн переделывается, грубо говоря где можно увидеть правильную информацию.И как вычислить правильный размер очереди, сейчас она у меня без ограничения, и если я беру файл в 600 мб то очередь тоже примерно метров 500 занимает в памяти – Vladimr Vladimirovoch Jul 27 '18 at 07:25
  • @VladimrVladimirovoch producer-consumer, размер очереди = количество потоков для обработки – slippyk Jul 27 '18 at 07:34
  • @slippyk по размеру очереди получается, что если у меня на коме 2 ядра, то я делаю 2*2 количество потоков на обработку,+ 1 поток на чтение по блокам и помещение в очередь, и еще один поток на извлечение из готовой очереди и запись файла, то есть у меня для обработки 4 потока, значит и очереди будут ограничены 4 элементами? Так? – Vladimr Vladimirovoch Jul 27 '18 at 08:01
  • @VladimrVladimirovoch как вариант да, но точно не меньше. задача ведь нагрузить процессор, а не память. – slippyk Jul 27 '18 at 08:16
  • @VladimrVladimirovoch - количество ядер процессора рекомендуют умножать на 2 в том случае, если потоки могут частично простаивать (обычно на операциях ввода-вывода - IO). А если потоки заняты чисто вычислительными задачами и загружены полностью, то их следует создавать ровно по количеству ядер. – Alexander Petrov Jul 27 '18 at 09:56
  • @AlexanderPetrov Спасибо за ответ, в моем случае поток будет простаивать потому, что очередь ограничена с блоками, пока очередь полная поток стоит, правда бьюсь с организацией потоков, не могу понять как producer cunsumer применить именно ко моей задачи и пример сложно найти – Vladimr Vladimirovoch Jul 28 '18 at 08:45