Running synchronous tasks on separate thread with FIFO
Hey guys,
I want to handle synchronous tasks that will run on another thread while the main thread is running. The separate tasks will write logs but is should be by order - FIFO.
I wrote the following class but I'm not sure how can I send the tasks as synchronous ones:
any suggestions how can I send to Enqueue method a synchronous task? thanks!
14 Replies
are you saying the logs should be in order? like in the order of when the tasks were started?
because if only the logs are the issue then probably you could solve that in another way
not only logs. it should run in order, yes
im not convinced about ProcessQueue and WaitUntilEmpty, but i would like to understand what you mean by sync task first
sync task means that i need to use method which not run asynchronously like writing to my db or a file
ConcurrentQueue is already thread safe, do we really need SemaphoreSlim ?
I exactly have same code and currently thinking to remove SemaphoreSlim for performance improvements
not sure about it. maybe i don't need. i just want to make sure that only when the current task finishes it will run the following one
I am not 100% sure either, but it looks like not necessary (created code review post)
ok, i will check that. Anyhow, if the task is synchronous one, should the queue have a type of Action or Task?
have you looked into the Channel<T> API?
it seems like a strong candidate for your use-case here
i personally use the idiom of a BackgroundService which has an injected ChannelReader<T> a lot. other code can push messages into the ChannelWriter<T> for simple in-memory queuing
Stephen Toub - MSFT
.NET Blog
An Introduction to System.Threading.Channels - .NET Blog
“Producer/consumer” problems are everywhere, in all facets of our lives. A line cook at a fast food restaurant, slicing tomatoes that are handed off to another cook to assemble a burger, which is handed off to a register worker to fulfill your order,
any idea about SemaphoreSlim, is it needed ? @Becquerel
i generally see using low-level stuff like that without a clear understanding of if you need it as a red flag in and of itself
channels use semaphores under-the-hood for what seem to be the OP's goal
looks good, but I'm not sure about this is fits my case. How can I handle the readers? I need some mechanism that willcheck all the time if someone writes something, is'nt it?
the way you check if something has written to the queue is with await/async
// Inject this via DI
private readonly ChannelReader<SomeRequestType> _channel;
// This is in some class that inherits from BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(var request in _channel.ReadAllAsync(stoppingToken))
{
try
{
await YourCodeHere(request, stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing request");
}
}
}
the .ReadAllAsync() method returns an IAsyncEnumerable so it is non-blocking when there is nothing in the queue
the DI registration would look something like this
var channel = Channel.CreateBounded<SomeRequestType>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
});
builder.Services.AddSingleton(channel.Reader);
builder.Services.AddSingleton(channel.Writer);
you could of course make the channel unbounded if you wanted to