0

Почему очередь не передаёт данные через мютекс из процесса в поток ??

Вот код :

// Применить мьютекс. 
using System;
using System.Threading;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;

// В этом классе содежится общий ресурс (переменная Queue), 
// а также мьютекс(Mtx), управляющий доступом к ней.  
class SharedRes
{
    //static readonly Queue<string> data = new Queue<string>();

    public static Queue<string> data = new Queue<string>();

    public static Mutex Mtx = new Mutex();
}

// В этом потоке переменная SharedRes.Queue инкрементируется. 
class IncThread
{
    int num;
    public Thread Thrd;

    public IncThread(string name)
    {
        Thrd = new Thread(this.Run);
        Thrd.Name = name;
        Thrd.Start();
    }

    // Точка входа в поток.  
    void Run()
    {
        ProcessStartInfo startInfo = new ProcessStartInfo()
        {
            FileName = "ping",
            Arguments = "www.ya.ru",
            RedirectStandardOutput = true,
            UseShellExecute = false,
            CreateNoWindow = true
        };

        using (var pingProcess = new Process())
        {
            pingProcess.StartInfo = startInfo;
            pingProcess.OutputDataReceived += OutputDataReceived;
            pingProcess.Start();
            pingProcess.BeginOutputReadLine();

            // На этом методе будет ожидание завершения запущенного процесса.
            // При получении каждой новой порции данных
            // будет вызываться событие OutputDataReceived.
            pingProcess.WaitForExit();
        }
        Console.WriteLine(Thrd.Name + " ожидает мютекс.");
    }

    private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
    {
        // Получить мьютекс. 
        SharedRes.Mtx.WaitOne();


        Console.WriteLine("Мы в потоке один");

        SharedRes.data.Enqueue(e.Data);
        //Console.WriteLine(e.Data);
        using (var writer = File.AppendText("log.txt"))
            writer.WriteLine(e.Data);

        // Освободить мьютекс. 
        SharedRes.Mtx.ReleaseMutex();
    }
}

// В этом потоке переменная SharedRes.Count декрементируеться. 
class DecThread
{
    int num;
    public Thread Thrd;

    public DecThread(string name)
    {
        Thrd = new Thread(new ThreadStart(this.Run));
        Thrd.Name = name;
        Thrd.Start();
    }

    // ТОчка входа в поток.  
    void Run()
    {
        // Получить мьютекс. 
        SharedRes.Mtx.WaitOne();

        Console.WriteLine("Мы в потоке два");

        while (SharedRes.data.Count > 0)
                Console.WriteLine(SharedRes.data.Dequeue());

        // Освободить мьютекс. 
        SharedRes.Mtx.ReleaseMutex();
    }
}

class MutexDemo
{
    static void Main()
    {
        // Construct three threads.  
        IncThread mt1 = new IncThread("Инкрементирующий поток");

        Thread.Sleep(1); // let the Increment thread start 

        DecThread mt2 = new DecThread("Декрементирующий поток");

        mt1.Thrd.Join();
        mt2.Thrd.Join();
    }
}

Вот вывод :

dima@komp:~/mita/TCP_IP$ mcs lovla_terminal7.cs 
dima@komp:~/mita/TCP_IP$ mono lovla_terminal7.exe 
Мы в потоке два
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
^C
timob256
  • 2,907

1 Answers1

2

Попробую разобрать все проблемы, которые я увидел в коде.

  1. Я бы не использовал Mutex для такой задачи, как и AutoResetEvent, так как оба тяжелые, и если вход в некую область выполнения кода должен производиться одновременно только одним потоком, я бы использовал ManualResetEventSlim или просто lock() какого-нибудь общего объекта.

А Mutex я бы вообще оставил для кросс-процессной синхронизации, например, с помощью него можно сделать, чтобы нельзя было запустить 2 копии программы. Его именно для этого чаще всего и используют.

  1. В вашем же случае второй поток стартует и пытается освободить еще пустую очередь, и сразу же завершается. Проблема наверное была не в мьютексе. Ну или я неправильно понял, как вы хотели, чтобы оно работало.

Если задача состоит в том, чтобы сначала один поток заполнил очередь, а второй потом ее освободил, то вы неправильно вызываете потоки, у вас должно получиться что-то такое.

static void Main()
{
    IncThread mt1 = new IncThread("Инкрементирующий поток");
    mt1.Thrd.Join();
DecThread mt2 = new DecThread(&quot;Декрементирующий поток&quot;);
mt2.Thrd.Join();

}

И да, здесь вообще теряется весь смысл многопоточности, ну кроме того, что она есть и работает. Эффекта от нее нет.

  1. Лучше использовать Task вместо Thread.

  2. Если же нужна реализация фокуса, состоящего в том, чтобы получать данные от внешнего процесса и одновременно по факту получения забирать их в другом потоке, то в бой вступают конкурентные коллекции, специально предназначенные для многопоточности. Моя любимая коллекция из таких - BlockingCollection.

Итак, попробую воплотить в коде то, что понаписал выше. На базе обычного консольного приложения.

public class Program
{
    static void Main(string[] args)
    {
        // Запускаем задачи
        Task task1 = new Producer("Передающая задача").Task;
        Task task2 = new Consumer("Принимающая задача").Task;
    // И ждем их завершения
    Task.WaitAll(task1, task2);
    Console.WriteLine(&quot;Все задачи завершены.&quot;);

    // Ну и чтобы окошко сразу не закрылось
    Console.ReadKey();
}

} static class SharedRes { // та самая блокирующая коллекция public static BlockingCollection<string> Data { get; } = new BlockingCollection<string>(); }

class Producer { public Task Task { get; } public string Name { get; }

public Producer(string name)
{
    Name = name;
    Task = Task.Run(Run);
}

void Run()
{
    ProcessStartInfo startInfo = new ProcessStartInfo()
    {
        FileName = &quot;ping&quot;,
        Arguments = &quot;www.ya.ru&quot;,
        RedirectStandardOutput = true,
        UseShellExecute = false,
        CreateNoWindow = true
    };

    using (var pingProcess = new Process())
    {
        pingProcess.StartInfo = startInfo;
        pingProcess.OutputDataReceived += OutputDataReceived;
        pingProcess.Start();
        pingProcess.BeginOutputReadLine();

        pingProcess.WaitForExit();
    }
    SharedRes.Data.CompleteAdding(); // отправить сигнал в BlockingCollection, что пора заканчивать
    Console.WriteLine(Name + &quot; завершает работу.&quot;);
}

private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
{
    Console.WriteLine(&quot;Мы в задаче один&quot;);
    SharedRes.Data.Add(e.Data);
    // немного упростил запись в лог
    File.AppendAllText(&quot;log.txt&quot;, e.Data + Environment.NewLine);
}

}

class Consumer { public Task Task { get; } public string Name { get; }

public Consumer(string name)
{
    Name = name;
    Task = Task.Run(Run);
}

void Run()
{
    // Здесь коллекция ждет, пока не появится новый элемент или пока не поступит сигнал заканчивать
    foreach (string s in SharedRes.Data.GetConsumingEnumerable())
    {
        Console.WriteLine(&quot;Мы в задаче два&quot;);
        Console.WriteLine(s);
    }
    Console.WriteLine(Name + &quot; завершает работу.&quot;);
}

}

Можно еще упростить код и сделать его приятнее с точки зрения логичности и читабельности, а также не блокировать основной поток при ожидании дочерних, с помощью async/await, но это уже другая история.

Вывод:

Мы в задаче один
Мы в задаче два

Мы в задаче один Мы в задаче два Pinging ya.ru [87.250.250.242] with 32 bytes of data: Мы в задаче один Мы в задаче два Reply from 87.250.250.242: bytes=32 time=17ms TTL=249 Мы в задаче один Мы в задаче два Reply from 87.250.250.242: bytes=32 time=16ms TTL=249 Мы в задаче один Мы в задаче два Reply from 87.250.250.242: bytes=32 time=17ms TTL=249 Мы в задаче один Мы в задаче два Reply from 87.250.250.242: bytes=32 time=16ms TTL=249 Мы в задаче один Мы в задаче два

Мы в задаче один Мы в задаче два Ping statistics for 87.250.250.242: Мы в задаче один Мы в задаче два Packets: Sent = 4, Received = 4, Lost = 0 (0% loss), Мы в задаче один Мы в задаче два Approximate round trip times in milli-seconds: Мы в задаче один Мы в задаче два Minimum = 16ms, Maximum = 17ms, Average = 16ms Мы в задаче один Мы в задаче два

Предающая задача завершает работу. Принимающая задача завершает работу. Все задачи завершены.

aepot
  • 49,560
  • 2
    Task не является оберткой над Thread. Task не создает поток. Task, она создает новые потоки только при необходимости, а когда завершается, не завершает поток, а оставляет его в режиме ожидания в пуле потоков ThreadPool это попросту неверно. – tym32167 May 11 '20 at 02:49
  • Task просто уделывает ручное создание потоков по производительности в разы. это как сравнивать теплое с мягким. – tym32167 May 11 '20 at 02:50
  • Зато про мьютекс я с вами согласен. – tym32167 May 11 '20 at 02:52
  • У автора задача похожа на типичный Producer/Consumer pattern – tym32167 May 11 '20 at 02:53
  • Task создает собственный поток только когда запущена с флагом TaskCreationOptions.LongRunning (в противном случае она выполняется на потоках из ThreadPool). Впрочем, делегат Process.OutputDataReceived и так по умолчанию запускается на ThreadPool. Мне кажется, здесь нет необходимости ни в Task, ни в Thread. task1 у вас бесполезно ждет в WaitForExit, а работу task2 мог бы выполнять основной поток. – MSDN.WhiteKnight May 11 '20 at 06:03
  • 1
    Я согласен с tym32167, утверждение что Task является оптимизированной оберткой над Thread лучше убрать. Оно может быть воспринято неправильно, как будто любой Thread можно преобразовать в Task, но это не так. Если поток выполнял длительную операцию, то перекладывание ее на Task (без флага LongRunning) может привести к исчерпанию ThreadPool. – MSDN.WhiteKnight May 11 '20 at 06:06
  • @MSDN.WhiteKnight спасибо, убрал. – aepot May 11 '20 at 06:40
  • @MSDN.WhiteKnight по поводу того, что здесь можно обойтись вообще без тасок, я знаю, но в учебных целях пытался создать нечто похожее на то, что показал автор с явным запуском задач. И спасибо, что показали мне мой низкий уровень компетенций в знаниях о ThreadPool, я уже это изучил. Остался только один любознательный вопрос: у меня 8 ядер, откуда практически моментально берется 32767 потокв в пуле при запуске приложения, если запуск одного - это ~30мс. Или это просто размер пула, а потоки стартуют по мере надобности? В прочем, всё работает, и ладно. :) – aepot May 11 '20 at 09:04
  • а как вы получаете число 32767? Это действительно больше похоже на максимальное, чем на текущее. – MSDN.WhiteKnight May 11 '20 at 10:06
  • @MSDN.WhiteKnight это и есть максимальное, просто не до конца понятно, что это значит. – aepot May 11 '20 at 10:20
  • 1
    Видимо 32767 - это значение по умолчанию для максимального числа потоков (которое задается параметром конфигурации https://docs.microsoft.com/en-us/dotnet/core/run-time-config/threading#maximum-threads ) - т.е. по умолчанию размер пула ограничен только размером переменной Int16. Например, для ASP.NET приложений, запускаемых в IIS, это значение будет меньше. Это теоретический максимум, а не реальное число созданных потоков (последнее можно получить как Maximum минус Available). – MSDN.WhiteKnight May 11 '20 at 10:57