C
C#5mo ago
Shemesh

Running synchronous tasks on separate thread with FIFO

Hey guys, I want to handle synchronous tasks that will run on another thread while the main thread is running. The separate tasks will write logs but is should be by order - FIFO. I wrote the following class but I'm not sure how can I send the tasks as synchronous ones:
c#
using System;
...

namespace mynamespace
{
public class TaskQueue
{
private readonly ConcurrentQueue<Func<Task>> _tasks = new ConcurrentQueue<Func<Task>>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(1);

public void Enqueue(Func<Task> task)
{
_tasks.Enqueue(task);
Task.Run(ProcessQueue);
}

public async Task EnqueueAsync(Func<Task> task)
{
_tasks.Enqueue(task);
await ProcessQueue();
}

private async Task ProcessQueue()
{
await _signal.WaitAsync();
try
{
if (_tasks.TryDequeue(out var task))
{
await task();
}
}
finally
{
_signal.Release();
}
}

public int GetQueueCount()
{
return _tasks.Count;
}

public IEnumerable<Func<Task>> GetTasksInQueue()
{
return _tasks.ToArray();
}

public async Task WaitUntilEmpty()
{
int timeout = 300;//30 seconds
while (_tasks.Count > 0 && timeout >= 0)
{
await Task.Delay(100);
timeout--;
}
if(timeout == 0)
{
Debug.WriteLine("TaskQueue did not empty in 30 seconds");
}
}
}
}
c#
using System;
...

namespace mynamespace
{
public class TaskQueue
{
private readonly ConcurrentQueue<Func<Task>> _tasks = new ConcurrentQueue<Func<Task>>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(1);

public void Enqueue(Func<Task> task)
{
_tasks.Enqueue(task);
Task.Run(ProcessQueue);
}

public async Task EnqueueAsync(Func<Task> task)
{
_tasks.Enqueue(task);
await ProcessQueue();
}

private async Task ProcessQueue()
{
await _signal.WaitAsync();
try
{
if (_tasks.TryDequeue(out var task))
{
await task();
}
}
finally
{
_signal.Release();
}
}

public int GetQueueCount()
{
return _tasks.Count;
}

public IEnumerable<Func<Task>> GetTasksInQueue()
{
return _tasks.ToArray();
}

public async Task WaitUntilEmpty()
{
int timeout = 300;//30 seconds
while (_tasks.Count > 0 && timeout >= 0)
{
await Task.Delay(100);
timeout--;
}
if(timeout == 0)
{
Debug.WriteLine("TaskQueue did not empty in 30 seconds");
}
}
}
}
any suggestions how can I send to Enqueue method a synchronous task? thanks!
14 Replies
HtmlCompiler
HtmlCompiler5mo ago
are you saying the logs should be in order? like in the order of when the tasks were started? because if only the logs are the issue then probably you could solve that in another way
Shemesh
ShemeshOP5mo ago
not only logs. it should run in order, yes
HtmlCompiler
HtmlCompiler5mo ago
im not convinced about ProcessQueue and WaitUntilEmpty, but i would like to understand what you mean by sync task first
Shemesh
ShemeshOP5mo ago
sync task means that i need to use method which not run asynchronously like writing to my db or a file
Cracker
Cracker5mo ago
ConcurrentQueue is already thread safe, do we really need SemaphoreSlim ? I exactly have same code and currently thinking to remove SemaphoreSlim for performance improvements
Shemesh
ShemeshOP5mo ago
not sure about it. maybe i don't need. i just want to make sure that only when the current task finishes it will run the following one
Cracker
Cracker5mo ago
I am not 100% sure either, but it looks like not necessary (created code review post)
Shemesh
ShemeshOP5mo ago
ok, i will check that. Anyhow, if the task is synchronous one, should the queue have a type of Action or Task?
becquerel
becquerel5mo ago
have you looked into the Channel<T> API? it seems like a strong candidate for your use-case here i personally use the idiom of a BackgroundService which has an injected ChannelReader<T> a lot. other code can push messages into the ChannelWriter<T> for simple in-memory queuing
becquerel
becquerel5mo ago
Stephen Toub - MSFT
.NET Blog
An Introduction to System.Threading.Channels - .NET Blog
“Producer/consumer” problems are everywhere, in all facets of our lives. A line cook at a fast food restaurant, slicing tomatoes that are handed off to another cook to assemble a burger, which is handed off to a register worker to fulfill your order,
Cracker
Cracker5mo ago
any idea about SemaphoreSlim, is it needed ? @Becquerel
becquerel
becquerel5mo ago
i generally see using low-level stuff like that without a clear understanding of if you need it as a red flag in and of itself channels use semaphores under-the-hood for what seem to be the OP's goal
Shemesh
ShemeshOP5mo ago
looks good, but I'm not sure about this is fits my case. How can I handle the readers? I need some mechanism that willcheck all the time if someone writes something, is'nt it?
becquerel
becquerel5mo ago
the way you check if something has written to the queue is with await/async // Inject this via DI private readonly ChannelReader<SomeRequestType> _channel; // This is in some class that inherits from BackgroundService protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach(var request in _channel.ReadAllAsync(stoppingToken)) { try { await YourCodeHere(request, stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Error processing request"); } } } the .ReadAllAsync() method returns an IAsyncEnumerable so it is non-blocking when there is nothing in the queue the DI registration would look something like this var channel = Channel.CreateBounded<SomeRequestType>(new BoundedChannelOptions(100) { FullMode = BoundedChannelFullMode.Wait }); builder.Services.AddSingleton(channel.Reader); builder.Services.AddSingleton(channel.Writer); you could of course make the channel unbounded if you wanted to
Want results from more Discord servers?
Add your server