Подскажите, пожалуйста, как мне сделать отправку http запросов на основе сигналов с использованием ManualResetEventSlim? На вход поступает сообщение, которое добавляется в ConcurrentQueue мне при обработке очереди нужно вызвать вызвать асинхронный метод отправки http запроса
public class WebApiSender
{
private readonly HttpClient _httpClient;
private readonly ConcurrentQueue<SenderMessage> _concurrentQueue;
public WebApiSender()
{
_httpClient = new HttpClient();
_concurrentQueue = new ConcurrentQueue<SenderMessage>();
}
public void AddMessage(SenderMessage message)
{
_concurrentQueue.Enqueue(message);
DoWork();
}
private void DoWork()
{
var signal = new ManualResetEventSlim(false);
while (_concurrentQueue.Count > 0)
{
if (_concurrentQueue.TryDequeue(out var message))
{
Task.Run(() =>
{
SendMessage(message).GetAwaiter().GetResult();
signal.Wait();
signal.Dispose();
});
}
}
signal.Set();
}
private async Task SendMessage(SenderMessage message)
{
const string url = "http://localhost:5087/api/Notification/GetNotification";
string request = JsonSerializer.Serialize(message);
var response = await _httpClient.PostAsync(url, new StringContent(request));
if (response.StatusCode == HttpStatusCode.OK)
{
Console.WriteLine(await response.Content.ReadAsStringAsync());
}
}
}
public class SenderMessage
{
public string Message { get; set; }
}
Upd
Пришёл к такой реализации осталось только избавится от бесконечного цикла, направили на путь с использованием CancellationToken'a, но как его тут применить пока не знаю.
public class TelegramSender : ITelegramSender
{
private readonly ILogger _logger;
private readonly string _url;
private readonly HttpClient _httpClient;
private readonly ConcurrentQueue<ITelegramMessage> _concurrentQueue;
private readonly ManualResetEventSlim _manualResetEventSlim;
public TelegramSender(ILogger logger, string url)
{
if(string.IsNullOrWhiteSpace(url))
throw new ArgumentException("Value cannot be empty or a space", nameof(url));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_url = url;
_httpClient = new HttpClient();
_concurrentQueue = new ConcurrentQueue<ITelegramMessage>();
_manualResetEventSlim = new ManualResetEventSlim(false);
Task.Run(DoWork);
}
public void AddMessage(ITelegramMessage message)
{
_concurrentQueue.Enqueue(message);
_manualResetEventSlim.Set();
}
private void DoWork()
{
while (true)
{
_manualResetEventSlim.Wait();
while (_concurrentQueue.Count > 0)
{
if (_concurrentQueue.TryDequeue(out ITelegramMessage message))
{
Task.WaitAll(SendMessage(message));
}
}
_manualResetEventSlim.Reset();
}
}
private async Task SendMessage(ITelegramMessage telegramMessage)
{
try
{
string url = $"{_url}/api/Message/Send";
_logger.Info($"HTTP post request to '{url}'");
var notification = JsonConvert.SerializeObject(telegramMessage);
using (var httpContent = new StringContent(notification, Encoding.UTF8, "application/json"))
{
using (var request = new HttpRequestMessage(HttpMethod.Post, url) { Content = httpContent })
{
var response = await _httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
_logger.Warn($"HTTP post request status: {response.StatusCode}");
}
_logger.Info($"HTTP post request to '{url}' was delivered");
}
}
}
catch (Exception e)
{
_logger.Error(e.Message);
}
}
}
SemaphoreSlim. Сам по себеManualResetEventSlimне имеет смысла, когда речь идет об асинхронных операциях, так как он является блокирующим примитивом синхронизации. – aepot Jan 29 '22 at 13:13.GetAwaiter().GetResult();называется sync over async - принудительная блокировка потока в ожидании завершения таска. Это очень плохая практика, никогда не делайте так. Опять же пример по ссылке выше избавит вас от этого костыля. – aepot Jan 29 '22 at 13:16if (response.StatusCode == HttpStatusCode.OK)замените наif (response.IsSuccessStatusCode). 2) вы забыли проIDisposable-using var response = await _httpClient.PostAsync(...);– aepot Jan 29 '22 at 13:46