C
C#•2d ago
nathanAjacobs

Would this AsyncQueue have issues with multiple producers and a single consumer?

First, I know Channels exist and I would obviously use them instead. I can't use them because I'm adding functionality to an existing Unity library that does not have access to them. I'm intending to use this internally in the library for IPC purposes between Unity Editor instances. Will this implementation have issues with synchronization? Also will FIFO be preserved? I think the answer is there are no issues with this, but want to confirm.
internal class AsyncQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);

public void Enqueue(T item)
{
_queue.Enqueue(item);
_semaphore.Release();
}

public async Task<T> DequeueAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
if (_queue.TryDequeue(out T item))
{
return item;
}

throw new InvalidOperationException("Something went wrong, there is nothing to dequeue.");
}
}
internal class AsyncQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);

public void Enqueue(T item)
{
_queue.Enqueue(item);
_semaphore.Release();
}

public async Task<T> DequeueAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
if (_queue.TryDequeue(out T item))
{
return item;
}

throw new InvalidOperationException("Something went wrong, there is nothing to dequeue.");
}
}
13 Replies
FusedQyou
FusedQyou•2d ago
Judging from this pull request channels support .NET Standard 2.1, which in turn Unity supports. Any particular reason why you can't just implement Channels?
GitHub
Add netstandard2.1 target to System.Threading.Channels (#38046) by ...
Resolves the #38046. What I don&#39;t particularly like is that System\Threading\Channels\ChannelReader.netcoreapp.cs is included now not only for netcoreapp targets but for netstandard2.1. I w...
nathanAjacobs
nathanAjacobsOP•2d ago
The library supports Unity versions before .NET Standard 2.1 was added
canton7
canton7•2d ago
ConcurrentQueue preserves FIFO order, yep (things like ConcurrentBag do not) Part of my brain's convinced that isn't safe, but I can't actually figure out any scenario where it wouldn't be safe
nathanAjacobs
nathanAjacobsOP•2d ago
Yeah that is what I thought, I think there are no issues. I think it would only be a problem if there were multiple consumers since I think the semaphore and dequeue operation could get out of sync
canton7
canton7•2d ago
I can't see how that could happen either tbh. It's one WaitAsync for each Dequeue Two threads can only get past the WaitAsync together if there were two items in the queue anyway
nathanAjacobs
nathanAjacobsOP•2d ago
True, I guess that would be a weird situation anyways. Thanks for the reassurance. Much appreciated!
canton7
canton7•2d ago
I'm only a bit nervous because I've seen other people making async concurrent queues around the internet, and they're all more complex than this... (I'd have to go and dig a couple out to see, and I'm afraid I need to head off)
nathanAjacobs
nathanAjacobsOP•2d ago
Yeah I saw some too, that is what made me ask honestly. No worries!
canton7
canton7•2d ago
I think Stephen Cleary has one as part of his Async library?
nathanAjacobs
nathanAjacobsOP•2d ago
Okay I'll take a look there too, thanks
wasabi
wasabi•2d ago
thertes no way to cancel waiters
nathanAjacobs
nathanAjacobsOP•2d ago
Yeah I realized that, I added a CancellationToken to DequeueAsync.
canton7
canton7•6h ago
Make sure that it isn't possibel to acquire the semaphore and then cancel without dequeueing, of course 😛

Did you find this page helpful?