CodeMan
CodeMan
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
In your example, I would just need a way to ensure the work is completed as fast as possible. Probably raise an event to let the consumers know that there is a new group to pickup.
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
The "unit groups" are fairly short lived. It may see 2 or 3 messages before needing to be discarded. There is a high occurence of new "group IDs" and few items belonging to each group. The work generally resolves in no more than a second or two.
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
What you're saying just clicked with me. That is an interesting idea.
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
Ah, I think I see what you mean. There might be something I can do on that end to optimize it. Generally speaking, the units and their group IDs are incoming messages from a third party. I do not know much at runtime. I simply handle the messages as they come.
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
When a unit arrives, it is first come first serve. The only caveat being the group condition where a unit must wait for any actively processing unit of the same group.
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
Thinking about going with something like this.
internal class MyReceivingWorker : MyWorker
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _turns = new();

protected override async Task DoWork(Item item)
{
_ = EnqueueItem(item);

await Task.CompletedTask;
}

private async Task EnqueueItem(Item item)
{
var thisTurn = new TaskCompletionSource<bool>();

var previousTurn = GetTurnToAwait(item.GroupId, thisTurn);

if (previousTurn != null)
{
await previousTurn.Task;
}

try
{
// Do stuff.
}
catch (Exception ex)
{
await EmitLog(new LogMessage(
LogSeverity.Error,
GetType().Name,
ex.Message,
ex));
}
finally
{
thisTurn.SetResult(true);
CleanupTurns();
}
}

private TaskCompletionSource<bool>? GetTurnToAwait(string groupId, TaskCompletionSource<bool> nextTurn)
{
lock (_turns)
{
TaskCompletionSource<bool>? currentTurn = null;

if (_turns.TryGetValue(groupId, out var tcs))
{
currentTurn = tcs;
}

_turns[groupId] = nextTurn;

return currentTurn;
}
}

private void CleanupTurns()
{
lock (_turns)
{
var completedEntries = _turns
.Where(x => x.Value.Task.IsCompleted)
.ToList();

foreach (var completedEntry in completedEntries)
{
_turns.TryRemove(completedEntry.Key, out _);
}
}
}
}
internal class MyReceivingWorker : MyWorker
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _turns = new();

protected override async Task DoWork(Item item)
{
_ = EnqueueItem(item);

await Task.CompletedTask;
}

private async Task EnqueueItem(Item item)
{
var thisTurn = new TaskCompletionSource<bool>();

var previousTurn = GetTurnToAwait(item.GroupId, thisTurn);

if (previousTurn != null)
{
await previousTurn.Task;
}

try
{
// Do stuff.
}
catch (Exception ex)
{
await EmitLog(new LogMessage(
LogSeverity.Error,
GetType().Name,
ex.Message,
ex));
}
finally
{
thisTurn.SetResult(true);
CleanupTurns();
}
}

private TaskCompletionSource<bool>? GetTurnToAwait(string groupId, TaskCompletionSource<bool> nextTurn)
{
lock (_turns)
{
TaskCompletionSource<bool>? currentTurn = null;

if (_turns.TryGetValue(groupId, out var tcs))
{
currentTurn = tcs;
}

_turns[groupId] = nextTurn;

return currentTurn;
}
}

private void CleanupTurns()
{
lock (_turns)
{
var completedEntries = _turns
.Where(x => x.Value.Task.IsCompleted)
.ToList();

foreach (var completedEntry in completedEntries)
{
_turns.TryRemove(completedEntry.Key, out _);
}
}
}
}
19 replies
CC#
Created by CodeMan on 3/4/2024 in #help
Dynamic channel/queue system where only one unit of a particular group can process at any given time
Order is unfortunately important. I have considered a few variations of a concurrent dictionary as well. For example, ConcurrentDictionary<string, Channel<UnitOfWork>>. One of my problems is I do not want to eat memory by spinning up new channels/queues and then having to discard them shortly thereafter as they are most likely short lived. I considered having a shared pool of channels but that seems like it could get me into trouble. Even wilder ideas are to use something like Orleans and have a grain dedicated to a group ID. I do not have a lot of experience with this setup, so I am hoping there is maybe a pattern or library that would make this kind of problem simpler to work with.
19 replies