0

разбираюсь с BlockingCollection и построение конвейера обработки данных на его основе. В официальном руководстве, приводятся пример первой стадии конвейера, в которой данные читаются из источника реализующего IEnumerable. Я хотел разобраться, как сделать первую стадию конвейера, ожидающую получение данных от IObservable.

Для упрощения приведу только 2 стадии конвейера чтение и запись. Сами методы обработки внутри стадии конвейера будут написаны заранее (до рефакторинга с применением BlockingCollection) и скорее всего будут асинхронные. У меня вопрос по реализации метода ReadStringsFromObservableSourse - верно ли я его реализовал, т.е. обработка ошибок и отмена выполнения.

_ts.GetSourseStringObservable() - выдает IObservable<string> 

В моем случае я вручную реализовал поступление данных через GetSourseStringObservable() используя класс SensorPolling, который периодически опрашивает какой-то ресурс. Но, в реальном случае, скорее всего будут нативные реализации IObservable, например, ожидание данных с очередей обмена или каких то внешних ресурсов. Вот меня интересуют все подводные камни которые могут быть в ожидании данных в конвейере.

public class Pipline
{
    private readonly TextSource _ts;       
    public Pipline(TextSource ts)
    {
        _ts = ts;
    }
public async Task DoPipeline(CancellationToken token)
{
    int seed = 5;
    int BufferSize = 50;
    var buffer1 = new BlockingCollection&lt;string&gt;(BufferSize);

    using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
    var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
    var stage1 = f.StartNew(() =&gt; ReadStringsFromObservableSourse(buffer1, seed, cts).GetAwaiter().GetResult());
    var stage2 = f.StartNew(() =&gt; WriteSentences(buffer1, cts).GetAwaiter().GetResult());

    await Task.WhenAll(stage1, stage2);
}


private async Task ReadStringsFromObservableSourse(BlockingCollection&lt;string&gt; output, int seed, CancellationTokenSource cts)
{
    var observableTask= new TaskCompletionSource&lt;object&gt;();
    try
    {
        var ct = cts.Token;
        using var observerLifeTime = _ts.GetSourseStringObservable().Take(seed).Subscribe(
            i =&gt;
            {
                if (ct.IsCancellationRequested)
                {
                    observableTask.SetCanceled();
                    return;
                }
                output.Add(i, ct);
            },
            e=&gt;
            {     
                observableTask.SetException(e);
            },
            () =&gt;
            {
                observableTask.SetResult(default);
            }
        );
        await observableTask.Task.WaitAsync(ct);
    }
    catch (Exception e)
    {
        cts.Cancel();      //нотификация отмены остальным стадиям конвеера.
        if (!(e is OperationCanceledException))
            throw;
    }
    finally
    {
        output.CompleteAdding();
    }
}


private async Task WriteSentences(BlockingCollection&lt;string&gt; input, CancellationTokenSource cts)
{
    try
    {
        var token = cts.Token;    
        foreach (var phrase in input.GetConsumingEnumerable())
        {
            if (token.IsCancellationRequested) break;
            await Task.Delay(100);
            var result = phrase + &quot;  Stage_Write&quot;;
            Console.WriteLine(result + &quot;   &quot; + DateTime.Now.ToString(&quot;T&quot;)); //Запись полученного результата
        }

    }
    catch (Exception e)
    {
        // If an exception occurs, notify all other pipeline stages.
        cts.Cancel(); 
        if (!(e is OperationCanceledException))
            throw;
    }
}

}

public class TextSource { public IObservable<string> GetSourseStringObservable() { var sensorFunc = new Func<CancellationToken, Task<string>>((ct) => GetValueFromRemoteSource("Get only new Values", ct)); var sp= new SensorPolling<string>(sensorFunc); var xs=sp.Polling(); return xs; }

private int i = 0;
private async Task&lt;string&gt; GetValueFromRemoteSource (string filter, CancellationToken ct)
{
    //filter not Use
    i++;
    await Task.Delay(500, ct);
    return i.ToString();
}

}

public class SensorPolling<T> { private readonly Func<CancellationToken, Task<T>> _tryTakeDataAsync;

public SensorPolling(Func&lt;CancellationToken, Task&lt;T&gt;&gt; tryTakeDataAsync)
{
    _tryTakeDataAsync = tryTakeDataAsync;
}

public IObservable&lt;T&gt; Polling()
{
    return Observable.Create&lt;T&gt;(
        (obs, ct) =&gt;
        {
            return Task.Run(async () =&gt;
            {
                while (!ct.IsCancellationRequested)
                {
                    try
                    {
                        var data = await _tryTakeDataAsync(ct);
                        obs.OnNext(data);
                    }
                    catch (OperationCanceledException)
                    {
                        obs.OnCompleted(); 
                    }
                    catch (Exception e)
                    {
                        obs.OnError(e);
                    }
                }
                Console.WriteLine(&quot;Loop terminated&quot;);
            }, ct);
        });
}

}

P.S. Реализация ReadStrings как в примере с офф. сайта получение данных через IEnumerable (_ts.GetNextString(seed)).

    private async Task ReadStrings(BlockingCollection<string> output, int seed, CancellationTokenSource cts)
    {
        try
        {
            var ct = cts.Token;
            foreach (var phrase in _ts.GetNextString(seed)) 
            {
                if (ct.IsCancellationRequested) break;
                await Task.Delay(100);
                var result = phrase + " Stage_Read"; //Работа стадии 1
                Console.WriteLine(result  + "   " + DateTime.Now.ToString("T"));
                output.Add(result, ct);
                throw new Exception("gfhgdfgfd");
            }
        }
        catch (Exception e)
        {
            // If an exception occurs, notify all other pipeline stages.
            cts.Cancel(); 
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            output.CompleteAdding();
        }
    }
Aldmi
  • 1,925
  • Я бы вот это var stage1 = f.StartNew(() => ReadStringsFromObservableSourse(buffer1, seed, cts).GetAwaiter().GetResult()) записал вот так var stage1 = ReadStringsFromObservableSourse(buffer1, seed, cts). Зачем LongRunning поток, который даже использоваться не будет, дорого это по ресурсам, и бессмысленно. – aepot Dec 08 '20 at 14:04
  • А вообще, конвейер обработки данных - это шаблон программирования Producer/Consumer. А для реализации его асинхронно, есть отличный инструмент - System.Threading.Channels. – aepot Dec 08 '20 at 14:08
  • А что всё так сложно-то? Есть же метод System.Reactive.Linq.Observable.ForEachAsync, он уже и CancellationToken принимает, и даже Task возвращает... – Pavel Mayorov Dec 08 '20 at 20:16
  • Кстати, зачем делать конвейер на основе BlockingCollection, если IObservable - уже конвейер? – Pavel Mayorov Dec 08 '20 at 20:17
  • 'BlockingCollection' я хотел локально использовать там где уже есть последовательный конвейер обработки, заменить его на конвейер BlockingCollection'. А Observable - грубо говоря глобальный сервис поставщик данных для всех сервисов обработки кому надо эти данные. Т.е. самое простое это использовать 'Observable.ForEachAsync' и городить такой огород без острой нужды не стоит? – Aldmi Dec 09 '20 at 05:10
  • System.Threading.Channels. - почитаю, Спасибо – Aldmi Dec 09 '20 at 05:12

0 Answers0