1

Подскажите, пожалуйста, как мне сделать отправку http запросов на основе сигналов с использованием ManualResetEventSlim? На вход поступает сообщение, которое добавляется в ConcurrentQueue мне при обработке очереди нужно вызвать вызвать асинхронный метод отправки http запроса

public class WebApiSender
{
    private readonly HttpClient _httpClient;
    private readonly ConcurrentQueue<SenderMessage> _concurrentQueue;
public WebApiSender()
{
    _httpClient = new HttpClient();

    _concurrentQueue = new ConcurrentQueue&lt;SenderMessage&gt;();
}

public void AddMessage(SenderMessage message)
{
    _concurrentQueue.Enqueue(message);

    DoWork();
}

private void DoWork()
{
    var signal = new ManualResetEventSlim(false);

    while (_concurrentQueue.Count &gt; 0)
    {
        if (_concurrentQueue.TryDequeue(out var message))
        {
            Task.Run(() =&gt;
            {
                SendMessage(message).GetAwaiter().GetResult();
                signal.Wait();
                signal.Dispose();
            });
        }
    }

    signal.Set();

}


private async Task SendMessage(SenderMessage message)
{
    const string url = &quot;http://localhost:5087/api/Notification/GetNotification&quot;;

    string request = JsonSerializer.Serialize(message);

    var response = await _httpClient.PostAsync(url, new StringContent(request));

    if (response.StatusCode == HttpStatusCode.OK)
    {
        Console.WriteLine(await response.Content.ReadAsStringAsync());
    }
}

}

public class SenderMessage { public string Message { get; set; } }

Upd

Пришёл к такой реализации осталось только избавится от бесконечного цикла, направили на путь с использованием CancellationToken'a, но как его тут применить пока не знаю.

public class TelegramSender : ITelegramSender
{
    private readonly ILogger _logger;
    private readonly string _url;
    private readonly HttpClient _httpClient;
    private readonly ConcurrentQueue<ITelegramMessage> _concurrentQueue;
    private readonly ManualResetEventSlim _manualResetEventSlim;
public TelegramSender(ILogger logger, string url)
{
    if(string.IsNullOrWhiteSpace(url))
        throw new ArgumentException(&quot;Value cannot be empty or a space&quot;, nameof(url));

    _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    _url = url;
    _httpClient = new HttpClient();

    _concurrentQueue = new ConcurrentQueue&lt;ITelegramMessage&gt;();

    _manualResetEventSlim = new ManualResetEventSlim(false);

    Task.Run(DoWork);
}

public void AddMessage(ITelegramMessage message)
{
    _concurrentQueue.Enqueue(message);

    _manualResetEventSlim.Set();
}

private void DoWork()
{
    while (true)
    {
        _manualResetEventSlim.Wait();

        while (_concurrentQueue.Count &gt; 0)
        {
            if (_concurrentQueue.TryDequeue(out ITelegramMessage message))
            {
                Task.WaitAll(SendMessage(message));
            }
        }

        _manualResetEventSlim.Reset();
    }
}

private async Task SendMessage(ITelegramMessage telegramMessage)
{
    try
    {
        string url = $&quot;{_url}/api/Message/Send&quot;;

        _logger.Info($&quot;HTTP post request to '{url}'&quot;);

        var notification = JsonConvert.SerializeObject(telegramMessage);

        using (var httpContent = new StringContent(notification, Encoding.UTF8, &quot;application/json&quot;))
        {
            using (var request = new HttpRequestMessage(HttpMethod.Post, url) { Content = httpContent })
            {
                var response = await _httpClient.SendAsync(request);

                if (!response.IsSuccessStatusCode)
                {
                    _logger.Warn($&quot;HTTP post request status: {response.StatusCode}&quot;);
                }


                _logger.Info($&quot;HTTP post request to '{url}' was delivered&quot;);
            }
        }
    }
    catch (Exception e)
    {
        _logger.Error(e.Message);
    }
}

}

  • 1
    Вам нужно познакомиться с шаблоном проектирования Producer/Consumer. Он вам ответит на основной вопрос. А так - вот пример, возмите тот что на основе SemaphoreSlim. Сам по себе ManualResetEventSlim не имеет смысла, когда речь идет об асинхронных операциях, так как он является блокирующим примитивом синхронизации. – aepot Jan 29 '22 at 13:13
  • 1
    .GetAwaiter().GetResult(); называется sync over async - принудительная блокировка потока в ожидании завершения таска. Это очень плохая практика, никогда не делайте так. Опять же пример по ссылке выше избавит вас от этого костыля. – aepot Jan 29 '22 at 13:16
  • 1
    Еще пара подсказок: 1) if (response.StatusCode == HttpStatusCode.OK) замените на if (response.IsSuccessStatusCode). 2) вы забыли про IDisposable - using var response = await _httpClient.PostAsync(...); – aepot Jan 29 '22 at 13:46
  • @aepot, про последний коммент, я это уже добавил, спасибо) Сейчас смотрю пример, который вы отправили – Fall Dimka Jan 29 '22 at 14:14
  • @aepot, всё-таки я думаю тут стоит применять ManualResetEventSlim, но возможно я не прав. На данный момент я пока не знаю как решить данную задачу, посмотрел ваш пример, мне кажется он мне не подходит – Fall Dimka Jan 29 '22 at 15:16
  • @aepot, я обновил вопрос, добавив своё решение, посмотрите, пожалуйста) – Fall Dimka Jan 31 '22 at 07:24
  • 1
    Task.WhenAll - асинхнонная операция, либо используйте await, либо Task.WaitAll вместо нее. Этот код нерабочий. – aepot Jan 31 '22 at 07:28
  • @aepot, посмотрите ещё раз, пожалуйста – Fall Dimka Jan 31 '22 at 10:44
  • И этот тоже нерабочий, вы последовательно ждете каждую задачу, при чем синхронно, то есть одновременно будет работать только одна. Где массив тасков? Еще раз внимательно изучите пример по ссылке выше. Его вообще почти "как есть" можно использовать. Зачем мудрите - непонятно. – aepot Jan 31 '22 at 10:48

0 Answers0