1

Как использовать структуру Queue в разных процессах (потоках) ??

Я как то накинул простейший пример, но увы почему-то не выходит передать структуру(Queue) в основной поток. Я на самом деле понимаю как вывести на монитор данные из потока Console.WriteLine(e.Data); но я хочу передавать их между потоками, а примеров(нормальных, а ни вечная теория) у вы как то не много.

Вот код :

using System.Text;
using System;
using System.IO;
using System.Diagnostics;
using System.Collections.Generic;

public class Test
{
    // Вот структура с которой невозможно работать. Как с ней работать ?? 
    struct QUE
    {
        Queue<string> numbers;
    }

    Queue<string> numbers = new Queue<string>(); 

    public static void Main()
    {
        // оформляем процесс 
        ProcessStartInfo startInfo = new ProcessStartInfo()
        {
            FileName = "ping", // имя запускаемого процесса
            Arguments = "www.ya.ru",  // аргументы процесса
            // настройки что именно от процесса " получать "
            RedirectStandardOutput = true,
            UseShellExecute = false,
            CreateNoWindow = true
        };
        // Какая то неизвестная для меня конструкция , увы так и непонял 
        // как она работает, по этому расписал как вижу :3 
        using (Process sevenZip = new Process())
        {
            // Думаю что соразмерно  Process sevenZip = Process.Start(startInfo);
            sevenZip.StartInfo = startInfo;
            // Принимаем данные (сообщения) от запущенного процесса 
            sevenZip.OutputDataReceived += proc_OutputDataReceived;
            // запускаем процесс
            sevenZip.Start();
            //получаем ответ запущенного процесса
            sevenZip.BeginOutputReadLine();
            // закрываем процесс
            sevenZip.WaitForExit();
        }
        //File.Open("log.txt");

        for(; ; )
        {
            Wiwod();
        }
    }

    private static void proc_OutputDataReceived(object sender, DataReceivedEventArgs e)
    {
        QUE wqe = new QUE();

        numbers.Enqueue(e.Data);

        // также пишем полученные данные в лог файл 
        using (var wr = File.AppendText("log.txt"))
            wr.WriteLine(e.Data);
    }

    public static void Wiwod()
    {
        Console.WriteLine(numbers.Dequeue());
    }
}

Вот вывод при компиляции :

dima@komp:~/mita/TCP_IP$ mcs lovla_terminal5.cs 
lovla_terminal5.cs(4,1): warning CS0105: The using directive for `System' appeared previously in this namespace
lovla_terminal5.cs(5,1): warning CS0105: The using directive for `System.IO' appeared previously in this namespace
lovla_terminal5.cs(64,9): error CS0120: An object reference is required to access non-static member `Test.numbers'
lovla_terminal5.cs(73,27): error CS0120: An object reference is required to access non-static member `Test.numbers'
Compilation failed: 2 error(s), 2 warnings

Спасибо Александр(@AlexanderPetrov) за ответ , но почемуто у меня не вышло получить вывод :(

ещё я слегка изменил метод

    private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
    {
        data.Enqueue(e.Data);
        // увы, но вывод мне нужен в самом DataOutput()
        //Console.WriteLine(e.Data);

        using (var writer = File.AppendText("log.txt"))
            writer.WriteLine(e.Data);
    }

Вот вывод (так же забыл упомянуть что работаю в линуксе, останавливал CTRL+/):

dima@komp:~/mita/TCP_IP$ mono lovla_terminal6.exe 
^C
dima@komp:~/mita/TCP_IP$ mono lovla_terminal6.exe 
11/11 packets, 0% loss, min/avg/ewma/max = 8.429/10.521/10.986/15.315 ms
Full thread dump:

"<threadpool thread>"
"Finalizer"
"Thread Pool Worker"  at <unknown> <0xffffffff>
  at (wrapper managed-to-native) System.IO.MonoIO.Read (intptr,byte[],int,int,System.IO.MonoIOError&) [0x00000] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.IO.MonoIO.Read (System.Runtime.InteropServices.SafeHandle,byte[],int,int,System.IO.MonoIOError&) <0x0007f>
  at System.IO.FileStream.ReadData (System.Runtime.InteropServices.SafeHandle,byte[],int,int) <0x0004b>
  at System.IO.FileStream.ReadInternal (byte[],int,int) <0x0008d>
  at System.IO.FileStream.Read (byte[],int,int) <0x000c5>
  at System.IO.Stream/<>c.<BeginReadInternal>b__40_0 (object) <0x000ca>
  at System.Threading.Tasks.Task`1<int>.InnerInvoke () [0x0002b] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading.Tasks.Task.Execute () <0x00039>
  at System.Threading.Tasks.Task.ExecutionContextCallback (object) <0x0004f>
  at System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext,System.Threading.ContextCallback,object,bool) <0x0014d>
  at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext,System.Threading.ContextCallback,object,bool) <0x00041>
  at System.Threading.Tasks.Task.ExecuteWithThreadLocal (System.Threading.Tasks.Task&) <0x000eb>
  at System.Threading.Tasks.Task.ExecuteEntry (bool) <0x0014d>
  at System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem () <0x00020>
  at System.Threading.ThreadPoolWorkQueue.Dispatch () [0x00074] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback () <0x00018>
  at (wrapper runtime-invoke) <Module>.runtime_invoke_bool (object,intptr,intptr,intptr) [0x0002a] in <ab4b6f8474df465bb9c200aa76d63763>:0

"<unnamed thread>"  at <unknown> <0xffffffff>
  at (wrapper managed-to-native) System.Threading.WaitHandle.Wait_internal (intptr*,int,bool,int) [0x00000] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading.WaitHandle.WaitOneNative (System.Runtime.InteropServices.SafeHandle,uint,bool,bool) <0x0010b>
  at System.Threading.WaitHandle.InternalWaitOne (System.Runtime.InteropServices.SafeHandle,long,bool,bool) <0x0003f>
  at System.Threading.WaitHandle.WaitOne (int,bool) <0x00036>
  at System.Diagnostics.Process.WaitForExit (int) [0x00024] in <e3496715a34a436dabd98136e5f36660>:0
  at System.Diagnostics.Process.WaitForExit () [0x00000] in <e3496715a34a436dabd98136e5f36660>:0
  at (wrapper remoting-invoke-with-check) System.Diagnostics.Process.WaitForExit () [0x00031] in <e3496715a34a436dabd98136e5f36660>:0
  at ConApp1.Program.Main () [0x00070] in <e0e1d6ce3b494d84a67d1194a63c7669>:0
  at (wrapper runtime-invoke) object.runtime_invoke_void (object,intptr,intptr,intptr) [0x0002a] in <ab4b6f8474df465bb9c200aa76d63763>:0

"Thread Pool Worker"
"Thread Pool Worker"
"Thread Pool Worker"
^C
dima@komp:~/mita/TCP_IP$ 

Вот прпробывал сам тдвумя потоками направить в нужное русло но снов аничего не вышло

using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading;

namespace ConApp1
{
    class Program
    {
        static object lockOn = new object(); //закрытый обьект, доступный для последующий блокировки
        static readonly Queue<string> data = new Queue<string>();

       static void Main()
        {
            ProcessStartInfo startInfo = new ProcessStartInfo()
            {
                FileName = "ping",
                Arguments = "www.ya.ru",
                RedirectStandardOutput = true,
                UseShellExecute = false,
                CreateNoWindow = true
            };

            using (var pingProcess = new Process())
            {
                pingProcess.StartInfo = startInfo;
                pingProcess.OutputDataReceived += OutputDataReceived;
                pingProcess.Start();
                pingProcess.BeginOutputReadLine();

                // На этом методе будет ожидание завершеня запущенного процесса.
                // При получении каждой новой порции данных
                // будет вызываться событие OutputDataReceived.
                pingProcess.WaitForExit();
            }

            string name = "второй поток";
            Thread Thrd = new Thread(Run);
            Thrd.Name = name; // задать имя потока
            Thrd.Start(); // начать поток

            Console.WriteLine("Процесс ping завершён.");
        }

        private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
        {
            Console.WriteLine("Мы в потоке один");

                data.Enqueue(e.Data);
                //Console.WriteLine(e.Data);
                using (var writer = File.AppendText("log.txt"))
                    writer.WriteLine(e.Data);

        }

        // Entry point of thread. 
        static void Run()
        {
            Console.WriteLine("Мы в потоке два");
            lock (lockOn)
            {
                while (data.Count > 0)
                    Console.WriteLine(data.Dequeue());
            }
        }

    }

}

вот вывод :

dima@komp:~/mita/TCP_IP$ mcs lovla_terminal6.cs 
dima@komp:~/mita/TCP_IP$ mono lovla_terminal6.exe 
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
25/25 packets, 0% loss, min/avg/ewma/max = 8.501/14.626/14.514/97.683 ms
Full thread dump:

"<unnamed thread>"  at <unknown> <0xffffffff>
  at (wrapper managed-to-native) System.Threading.WaitHandle.Wait_internal (intptr*,int,bool,int) [0x00000] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading.WaitHandle.WaitOneNative (System.Runtime.InteropServices.SafeHandle,uint,bool,bool) <0x0010b>
  at System.Threading.WaitHandle.InternalWaitOne (System.Runtime.InteropServices.SafeHandle,long,bool,bool) <0x0003f>
  at System.Threading.WaitHandle.WaitOne (int,bool) <0x00036>
  at System.Diagnostics.Process.WaitForExit (int) [0x00024] in <e3496715a34a436dabd98136e5f36660>:0
  at System.Diagnostics.Process.WaitForExit () [0x00000] in <e3496715a34a436dabd98136e5f36660>:0
  at (wrapper remoting-invoke-with-check) System.Diagnostics.Process.WaitForExit () [0x00031] in <e3496715a34a436dabd98136e5f36660>:0
  at ConApp1.Program.Main () [0x00070] in <c4eb53bee8824264b0c25a4b27c5c291>:0
  at (wrapper runtime-invoke) object.runtime_invoke_void (object,intptr,intptr,intptr) [0x0002a] in <ab4b6f8474df465bb9c200aa76d63763>:0

"Thread Pool Worker"
"Thread Pool Worker"
"<threadpool thread>"
"Thread Pool Worker"  at <unknown> <0xffffffff>
  at (wrapper managed-to-native) System.IO.MonoIO.Read (intptr,byte[],int,int,System.IO.MonoIOError&) [0x00000] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.IO.MonoIO.Read (System.Runtime.InteropServices.SafeHandle,byte[],int,int,System.IO.MonoIOError&) <0x0007f>
  at System.IO.FileStream.ReadData (System.Runtime.InteropServices.SafeHandle,byte[],int,int) <0x0004b>
  at System.IO.FileStream.ReadInternal (byte[],int,int) <0x0008d>
  at System.IO.FileStream.Read (byte[],int,int) <0x000c5>
  at System.IO.Stream/<>c.<BeginReadInternal>b__40_0 (object) <0x000ca>
  at System.Threading.Tasks.Task`1<int>.InnerInvoke () [0x0002b] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading.Tasks.Task.Execute () <0x00039>
  at System.Threading.Tasks.Task.ExecutionContextCallback (object) <0x0004f>
  at System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext,System.Threading.ContextCallback,object,bool) <0x0014d>
  at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext,System.Threading.ContextCallback,object,bool) <0x00041>
  at System.Threading.Tasks.Task.ExecuteWithThreadLocal (System.Threading.Tasks.Task&) <0x000eb>
  at System.Threading.Tasks.Task.ExecuteEntry (bool) <0x0014d>
  at System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem () <0x00020>
  at System.Threading.ThreadPoolWorkQueue.Dispatch () [0x00074] in <ab4b6f8474df465bb9c200aa76d63763>:0
  at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback () <0x00018>
  at (wrapper runtime-invoke) <Module>.runtime_invoke_bool (object,intptr,intptr,intptr) [0x0002a] in <ab4b6f8474df465bb9c200aa76d63763>:0

"Finalizer"
"Thread Pool Worker"Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
^X^Z
[1]+  Остановлен    mono lovla_terminal6.exe

Вот ещё реализация с мютексом.

// Применить мьютекс. 
using System;
using System.Threading;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;

// В этом классе содежится общий ресурс (переменная Queue), 
// а также мьютекс(Mtx), управляющий доступом к ней.  
class SharedRes
{
    //static readonly Queue<string> data = new Queue<string>();

    public static Queue<string> data = new Queue<string>();

    public static Mutex Mtx = new Mutex();
}

// В этом потоке переменная SharedRes.Queue инкрементируется. 
class IncThread
{
    int num;
    public Thread Thrd;

    public IncThread(string name)
    {
        Thrd = new Thread(this.Run);
        Thrd.Name = name;
        Thrd.Start();
    }

    // Точка входа в поток.  
    void Run()
    {
        ProcessStartInfo startInfo = new ProcessStartInfo()
        {
            FileName = "ping",
            Arguments = "www.ya.ru",
            RedirectStandardOutput = true,
            UseShellExecute = false,
            CreateNoWindow = true
        };

        using (var pingProcess = new Process())
        {
            pingProcess.StartInfo = startInfo;
            pingProcess.OutputDataReceived += OutputDataReceived;
            pingProcess.Start();
            pingProcess.BeginOutputReadLine();

            // На этом методе будет ожидание завершеня запущенного процесса.
            // При получении каждой новой порции данных
            // будет вызываться событие OutputDataReceived.
            pingProcess.WaitForExit();
        }
        Console.WriteLine(Thrd.Name + " ожидает мютекс.");
    }

    private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
    {
        // Получить мьютекс. 
        SharedRes.Mtx.WaitOne();


        Console.WriteLine("Мы в потоке один");

        SharedRes.data.Enqueue(e.Data);
        //Console.WriteLine(e.Data);
        using (var writer = File.AppendText("log.txt"))
            writer.WriteLine(e.Data);

        // Освободить мьютекс. 
        SharedRes.Mtx.ReleaseMutex();
    }
}

// В этом потоке переменная SharedRes.Count декрементируеться. 
class DecThread
{
    int num;
    public Thread Thrd;

    public DecThread(string name)
    {
        Thrd = new Thread(new ThreadStart(this.Run));
        Thrd.Name = name;
        Thrd.Start();
    }

    // ТОчка входа в поток.  
    void Run()
    {
        // Получить мьютекс. 
        SharedRes.Mtx.WaitOne();

        Console.WriteLine("Мы в потоке два");

        while (SharedRes.data.Count > 0)
                Console.WriteLine(SharedRes.data.Dequeue());

        // Освободить мьютекс. 
        SharedRes.Mtx.ReleaseMutex();
    }
}

class MutexDemo
{
    static void Main()
    {
        // Construct three threads.  
        IncThread mt1 = new IncThread("Инкрементирующий поток");

        Thread.Sleep(1); // let the Increment thread start 

        DecThread mt2 = new DecThread("Декрементирующий поток");

        mt1.Thrd.Join();
        mt2.Thrd.Join();
    }
}

ВОт вывод :

dima@komp:~/mita/TCP_IP$ mcs lovla_terminal7.cs 
lovla_terminal7.cs(23,9): warning CS0169: The private field `IncThread.num' is never used
lovla_terminal7.cs(81,9): warning CS0169: The private field `DecThread.num' is never used
Compilation succeeded - 2 warning(s)
dima@komp:~/mita/TCP_IP$ mono lovla_terminal7.exe 
Мы в потоке два
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
Мы в потоке один
^C
dima@komp:~/mita/TCP_IP$ 
timob256
  • 2,907

2 Answers2

2

Если вы хотите работать в разных потоках, то это работа с разделённым ресурсом, например:

using System.Collections.Concurrent; // concurrent collection - thread-safe collection

public static class SomeClass
{
    public static ConcurrentQueue<int> Queue { get; }
}

Если вы хотите работать в разных процессах, то тут понадобится пайпинг (если процесс относятся друг к другу как родитель-ребёнок) (ссылка).


И пожалуйста, именуйте всё в вашем коде как гласит code-style данного языка (в данном случае - c#):

  • proc_SomeProc -> SomeProc / Process_SomeProc
  • QUE -> Que -> Queue
  • преобразовать
struct Queue
{
    public Queue<int> Queue;
}

в

// in Program class
//...
public static Queue<int> queue;
  • Wiwod -> WriteNumbers
return
  • 2,740
  • очень хороший ответ но увы я так и не смог им воспользоваться (решил поспать и поиграть в видио игры) – timob256 May 01 '20 at 09:49
  • @timob256 ??? так ответ помог вам или нет? может что-нибудь нужно доработать? – return May 01 '20 at 16:48
  • Увы я так и не смог запустить программу :( – timob256 May 05 '20 at 11:26
  • @timob256 Что вы имеете ввиду? У вас диск сгорел, или вы свой комп потеряли, или что? – return May 05 '20 at 14:55
  • посмотрите пожалуйста мой обновленный вопрос – timob256 May 10 '20 at 12:59
2

Я слегка подправил ваш код. В таком виде он работает.
Я постарался дать говорящие имена переменным.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;

namespace ConApp1
{
    class Program
    {
        static readonly Queue<string> data = new Queue<string>();

        static void Main()
        {
            ProcessStartInfo startInfo = new ProcessStartInfo()
            {
                FileName = "ping",
                Arguments = "www.ya.ru",
                RedirectStandardOutput = true,
                UseShellExecute = false,
                CreateNoWindow = true
            };

            using (var pingProcess = new Process())
            {
                pingProcess.StartInfo = startInfo;
                pingProcess.OutputDataReceived += OutputDataReceived;
                pingProcess.Start();
                pingProcess.BeginOutputReadLine();

                // На этом методе будет ожидание завершеня запущенного процесса.
                // При получении каждой новой порции данных
                // будет вызываться событие OutputDataReceived.
                pingProcess.WaitForExit();
            }

            Console.WriteLine("Процесс ping завершён.");

            DataOutput();
        }
        private static void OutputDataReceived(object sender, DataReceivedEventArgs e)
        {
            data.Enqueue(e.Data);

            Console.WriteLine(e.Data);

            using (var writer = File.AppendText("log.txt"))
                writer.WriteLine(e.Data);
        }

        public static void DataOutput()
        {
            Console.WriteLine("Вывод данных из очереди:");

            while (data.Count > 0)
                Console.WriteLine(data.Dequeue());
        }
    }
}

Какая то неизвестная для меня конструкция
using (Process sevenZip = new Process())

Тут создаётся экземпляр класса Process. Очевидно, что в том примере, который вы смотрели, запускалась программа 7zip, поэтому автор дал такое название. Я переименовал в pingProcess.

Этот класс является Disposable, поэтому крайне желательно вызывать метод Dispose, когда он больше не нужен. Именно для этого он обёрнут в оператор using - диспоз будет вызван автоматически даже в случае какого-либо исключения.


После запуска программы ваш код будет ждать её завершения на вызове метода WaitForExit. Лишь когда ping завершится, появится вывод строки Console.WriteLine("Процесс ping завершён.");

В это время будет периодически вызываться обработчик события OutputDataReceived. В нём обрабатываем каждую полученную порцию данных. В частности, данные заносятся в очередь, выводятся на консоль и пишутся в файл.

Кстати, File.AppendText будет при каждом вызове открывать и закрывать файл - это очень медленные операции, поэтому правильнее будет переписать код на использование StreamWriter'а (создаём и открываем его до запуска процесса ping, закрываем после завершения; ссылка хранится в поле).


В данном коде Queue используется в одном-единственном процессе - нашем. В посторонний процесс мы никак не можем её передать, да это и не нужно.

Надеюсь, прояснил ситуацию. Спрашивайте, если что-то ещё не понятно.