Всем здравствуйте, есть у меня задача по архивации большого файла, при этом я его разбиваю на части, части складываю в очередь, далее обрабатываю каждую часть отдельным потоком, кладу в новую очередь и от туда уже пишу в файл, и тут два вопроса, подскажите как сделать проверку полноты очереди с блоками и ограничение этой очереди определенным количеством блоков.По логике я понимаю, что когда очередь полная, то поток должен ждать пока очередь опустеет до одного элемента, чтобы совсем не прерывать процесс.Просто сейчас у меня используется полностью процессор и полностью память, но когда она вся используется это не совсем верно Вот сам код пока в таком исполнении. Все сжимает.
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.IO.Compression;
using System.Threading;
namespace ConsoleApplication57
{
class Program
{
static void Main(string[] args)
{
string path = @"d:\4.stl";
string path2 = @"d:\Black Widow111.m4a";
string path_compres = @"d:\Compress.gz";
// определяю количество потоков, для обработки блоков использую количество ядер*2 и два потока на чтение данных по блокам и помещение их в очередь и финальную запись данных в файл
int core = Environment.ProcessorCount;
int thredCount = core * 2+2;
//ограничение элементов в очереди
int queue_max_count = core * 2;
PCQueue q = new PCQueue(thredCount);
// создаем очередь c блоками ланных
Queue<KeyValuePair<int, byte[]>> queue_block = new Queue<KeyValuePair<int, byte[]>>();
// создаем очередь с готовыми обработанными блоками
Queue<KeyValuePair<int, byte[]>> readyQueue = new Queue<KeyValuePair<int, byte[]>>();
// открываем поток
using (var fs = new FileStream(path, FileMode.Open))
// добавляем в очередь считанные блоки
ADD_Block_to_Queue(queue_block,fs);
// сжимаем/разжимаем и добавляем в готовую очередь в разных потоках
Console.WriteLine("Добавили в очередь задачи");
for (int i = 0; i < thredCount; i++)
{
int itemNumber = i;
q.EnqueueItem(() =>
{
// сжимаем/разжимаем и добавляем в готовую очередь
Add_obrabotanniy_block_to_readyQueue(queue_block, readyQueue);
Console.Write(" Задача " + itemNumber);
});
}
q.Shutdown(true);
// пишем в файл блоки
IOrderedEnumerable<KeyValuePair<int, byte[]>> ordered = readyQueue.OrderBy(e => e.Key);
foreach (var kv in ordered)
{
Write_Final_File(path_compres,readyQueue.Dequeue().Value);
}
Console.WriteLine();
Console.WriteLine("Workers complete!");
}
// добавляем в очередь считанные блоки
public static void ADD_Block_to_Queue(Queue<KeyValuePair<int,byte[]>> queue_block,Stream fs)
{
foreach (KeyValuePair<int, byte[]> block in Read_Blockk(fs))
{
queue_block.Enqueue(block);
}
}
// добавляем в очередm для готовых обработанных блоков
public static void Add_obrabotanniy_block_to_readyQueue(Queue<KeyValuePair<int, byte[]>> queue_block, Queue<KeyValuePair<int, byte[]>> readyQueue)
{
while (queue_block.Count > 0)
{
var block = queue_block.Dequeue();
var compressionBlock = COmpress(block.Key, block.Value);
readyQueue.Enqueue(compressionBlock);
}
}
// читаем данные из файла блоками
public static IEnumerable<KeyValuePair<int,byte[]>> Read_Blockk(Stream stream)
{
const int size_block=1024 * 1024; // определяем размер буфера=1мб
int index = 0; // номер блока
while (stream.Position<stream.Length)
{
// выделяем память под массив буффера.
byte[] buffer=new byte[System.Math.Min(size_block,stream.Length-stream.Position)];
stream.Read(buffer, 0, buffer.Length);
yield return new KeyValuePair<int, byte[]>(index++,buffer);
}
}
//сжимаем данные
public static KeyValuePair<int, byte[]> COmpress(int index,byte[] block)
{
using (var ms = new MemoryStream())
using (var gzStream=new GZipStream(ms,CompressionMode.Compress))
{
int ind = index;
gzStream.Write(block,0,block.Length);
gzStream.Close();
return new KeyValuePair<int, byte[]>(ind++,ms.ToArray());
}
}
// разжимаем данные
public static KeyValuePair<int, byte[]> DEcompres(int index,byte[] block)
{
using (var ms=new MemoryStream())
using (var gzstream=new GZipStream(ms,CompressionMode.Decompress))
{
int ind = index;
gzstream.Read(block, 0, block.Length);
gzstream.Close();
return new KeyValuePair<int, byte[]>(ind++,ms.ToArray());
}
}
// пишем даныне в файл
public static void Write_Final_File(string path, byte[] ReadyBlock)
{
using (var fsWrite = new FileStream(path, FileMode.Append, FileAccess.Write))
fsWrite.Write(ReadyBlock,0,ReadyBlock.Length);
}
}
}
Класс организации потоков.
using System;
using System.Threading;
using System.Collections.Generic;
public class PCQueue
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action> _itemQ = new Queue<Action>();
public PCQueue(int workerCount)
{
_workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers[i] = new Thread(Consume)).Start();
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
Action item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(); // Execute item.
}
}
}