разбираюсь с BlockingCollection и построение конвейера обработки данных на его основе.
В официальном руководстве, приводятся пример первой стадии конвейера, в которой данные читаются из источника реализующего IEnumerable.
Я хотел разобраться, как сделать первую стадию конвейера, ожидающую получение данных от IObservable.
Для упрощения приведу только 2 стадии конвейера чтение и запись.
Сами методы обработки внутри стадии конвейера будут написаны заранее (до рефакторинга с применением BlockingCollection) и скорее всего будут асинхронные.
У меня вопрос по реализации метода ReadStringsFromObservableSourse - верно ли я его реализовал, т.е. обработка ошибок и отмена выполнения.
_ts.GetSourseStringObservable() - выдает IObservable<string>
В моем случае я вручную реализовал поступление данных через GetSourseStringObservable() используя класс SensorPolling, который периодически опрашивает какой-то ресурс.
Но, в реальном случае, скорее всего будут нативные реализации IObservable, например, ожидание данных с очередей обмена или каких то внешних ресурсов.
Вот меня интересуют все подводные камни которые могут быть в ожидании данных в конвейере.
public class Pipline
{
private readonly TextSource _ts;
public Pipline(TextSource ts)
{
_ts = ts;
}
public async Task DoPipeline(CancellationToken token)
{
int seed = 5;
int BufferSize = 50;
var buffer1 = new BlockingCollection<string>(BufferSize);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var stage1 = f.StartNew(() => ReadStringsFromObservableSourse(buffer1, seed, cts).GetAwaiter().GetResult());
var stage2 = f.StartNew(() => WriteSentences(buffer1, cts).GetAwaiter().GetResult());
await Task.WhenAll(stage1, stage2);
}
private async Task ReadStringsFromObservableSourse(BlockingCollection<string> output, int seed, CancellationTokenSource cts)
{
var observableTask= new TaskCompletionSource<object>();
try
{
var ct = cts.Token;
using var observerLifeTime = _ts.GetSourseStringObservable().Take(seed).Subscribe(
i =>
{
if (ct.IsCancellationRequested)
{
observableTask.SetCanceled();
return;
}
output.Add(i, ct);
},
e=>
{
observableTask.SetException(e);
},
() =>
{
observableTask.SetResult(default);
}
);
await observableTask.Task.WaitAsync(ct);
}
catch (Exception e)
{
cts.Cancel(); //нотификация отмены остальным стадиям конвеера.
if (!(e is OperationCanceledException))
throw;
}
finally
{
output.CompleteAdding();
}
}
private async Task WriteSentences(BlockingCollection<string> input, CancellationTokenSource cts)
{
try
{
var token = cts.Token;
foreach (var phrase in input.GetConsumingEnumerable())
{
if (token.IsCancellationRequested) break;
await Task.Delay(100);
var result = phrase + " Stage_Write";
Console.WriteLine(result + " " + DateTime.Now.ToString("T")); //Запись полученного результата
}
}
catch (Exception e)
{
// If an exception occurs, notify all other pipeline stages.
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
}
}
public class TextSource
{
public IObservable<string> GetSourseStringObservable()
{
var sensorFunc = new Func<CancellationToken, Task<string>>((ct) => GetValueFromRemoteSource("Get only new Values", ct));
var sp= new SensorPolling<string>(sensorFunc);
var xs=sp.Polling();
return xs;
}
private int i = 0;
private async Task<string> GetValueFromRemoteSource (string filter, CancellationToken ct)
{
//filter not Use
i++;
await Task.Delay(500, ct);
return i.ToString();
}
}
public class SensorPolling<T>
{
private readonly Func<CancellationToken, Task<T>> _tryTakeDataAsync;
public SensorPolling(Func<CancellationToken, Task<T>> tryTakeDataAsync)
{
_tryTakeDataAsync = tryTakeDataAsync;
}
public IObservable<T> Polling()
{
return Observable.Create<T>(
(obs, ct) =>
{
return Task.Run(async () =>
{
while (!ct.IsCancellationRequested)
{
try
{
var data = await _tryTakeDataAsync(ct);
obs.OnNext(data);
}
catch (OperationCanceledException)
{
obs.OnCompleted();
}
catch (Exception e)
{
obs.OnError(e);
}
}
Console.WriteLine("Loop terminated");
}, ct);
});
}
}
P.S. Реализация ReadStrings как в примере с офф. сайта получение данных через IEnumerable (_ts.GetNextString(seed)).
private async Task ReadStrings(BlockingCollection<string> output, int seed, CancellationTokenSource cts)
{
try
{
var ct = cts.Token;
foreach (var phrase in _ts.GetNextString(seed))
{
if (ct.IsCancellationRequested) break;
await Task.Delay(100);
var result = phrase + " Stage_Read"; //Работа стадии 1
Console.WriteLine(result + " " + DateTime.Now.ToString("T"));
output.Add(result, ct);
throw new Exception("gfhgdfgfd");
}
}
catch (Exception e)
{
// If an exception occurs, notify all other pipeline stages.
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
output.CompleteAdding();
}
}
var stage1 = f.StartNew(() => ReadStringsFromObservableSourse(buffer1, seed, cts).GetAwaiter().GetResult())записал вот такvar stage1 = ReadStringsFromObservableSourse(buffer1, seed, cts). ЗачемLongRunningпоток, который даже использоваться не будет, дорого это по ресурсам, и бессмысленно. – aepot Dec 08 '20 at 14:04System.Reactive.Linq.Observable.ForEachAsync, он уже иCancellationTokenпринимает, и дажеTaskвозвращает... – Pavel Mayorov Dec 08 '20 at 20:16BlockingCollection, еслиIObservable- уже конвейер? – Pavel Mayorov Dec 08 '20 at 20:17