C
C#•2mo ago
Cryy

Asynchronous batch processing (queue) in C#

Hey guys, I have some batches to process and to do so I have to send each of my batches to external APi to process. I have no experience with async programming so I simplified it a bit by using Parallel.Foreach that create thread/task for each API calling and waiting for response. The reason I dont want to call API synchronously Is that processing of one batch can take a one sec or 30min (max). With the Parallel.Foreach solution it looks it works fine but form what I read on internet it's not a good approach so I want to implement kind of async queue that will take a few items from my collection of batches starts processing and after some of them is finished replace it with another etc. I tried to came up with something using ChatGPT but it only process specified number of batches (3 in my case) and than it does nothing but even if the code is short there are lot's of new concepts for me so I have no idea how to fixed this.
public async Task ProcessBatchQueueAsync(IEnumerable<int> batchIds)
{
var semaphore = new SemaphoreSlim(3);
var tasks = new List<Task>();

foreach (var batchId in batchIds)
{
await semaphore.WaitAsync();

var task = Task.Run(async () =>
{
try
{
await ProcessSingleBatchAsync(batchId);
}
finally
{
semaphore.Release();
}
});

tasks.Add(task);
}

await Task.WhenAll(tasks);
}

private async Task ProcessSingleBatchAsync(int batchId)
{
_loggerController.Log(LogLevel.Info, $"Processing batch {batchId}...");
await Task.Delay(2000);
_loggerController.Log(LogLevel.Info, $"Finished processing batch {batchId}...");
}
public async Task ProcessBatchQueueAsync(IEnumerable<int> batchIds)
{
var semaphore = new SemaphoreSlim(3);
var tasks = new List<Task>();

foreach (var batchId in batchIds)
{
await semaphore.WaitAsync();

var task = Task.Run(async () =>
{
try
{
await ProcessSingleBatchAsync(batchId);
}
finally
{
semaphore.Release();
}
});

tasks.Add(task);
}

await Task.WhenAll(tasks);
}

private async Task ProcessSingleBatchAsync(int batchId)
{
_loggerController.Log(LogLevel.Info, $"Processing batch {batchId}...");
await Task.Delay(2000);
_loggerController.Log(LogLevel.Info, $"Finished processing batch {batchId}...");
}
13 Replies
Pobiega
Pobiega•2mo ago
I can't reproduce your described error. Here is the output of my experiment using your method:
[13:41:03 INF] Processing batch 3...
[13:41:03 INF] Processing batch 2...
[13:41:03 INF] Processing batch 1...
[13:41:05 INF] Finished processing batch 1
[13:41:05 INF] Finished processing batch 2
[13:41:05 INF] Finished processing batch 3
[13:41:05 INF] Processing batch 4...
[13:41:05 INF] Processing batch 5...
[13:41:07 INF] Finished processing batch 5
[13:41:07 INF] Finished processing batch 4
[13:41:07 INF] Finished processing all batches
[13:41:03 INF] Processing batch 3...
[13:41:03 INF] Processing batch 2...
[13:41:03 INF] Processing batch 1...
[13:41:05 INF] Finished processing batch 1
[13:41:05 INF] Finished processing batch 2
[13:41:05 INF] Finished processing batch 3
[13:41:05 INF] Processing batch 4...
[13:41:05 INF] Processing batch 5...
[13:41:07 INF] Finished processing batch 5
[13:41:07 INF] Finished processing batch 4
[13:41:07 INF] Finished processing all batches
Cryy
CryyOP•2mo ago
Hm, that's weird. I have list of these ids
var batchIds = new List<int> { 22346, 22347, 22348, 22349, 22350, 22351, 22352, 22353, 22354, 22355, 22356, 22357 };
var batchIds = new List<int> { 22346, 22347, 22348, 22349, 22350, 22351, 22352, 22353, 22354, 22355, 22356, 22357 };
and only first 3 of them I can see in logs ...
Pobiega
Pobiega•2mo ago
using Serilog;

var logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();

var batches = new List<int> { 1, 2, 3, 4, 5 };

await ProcessBatchQueueAsync(batches);

logger.Information("Finished processing all batches");

async Task ProcessBatchQueueAsync(IEnumerable<int> batchIds)
{
var semaphore = new SemaphoreSlim(3);
var tasks = new List<Task>();

foreach (var batchId in batchIds)
{
await semaphore.WaitAsync();

var task = Task.Run(async () =>
{
try
{
await ProcessSingleBatchAsync(batchId);
}
finally
{
semaphore.Release();
}
});

tasks.Add(task);
}

await Task.WhenAll(tasks);
}

async Task ProcessSingleBatchAsync(int batchId)
{
logger.Information("Processing batch {BatchId}...", batchId);
await Task.Delay(1000 + Random.Shared.Next(5000));
logger.Information("Finished processing batch {BatchId}", batchId);
}
using Serilog;

var logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();

var batches = new List<int> { 1, 2, 3, 4, 5 };

await ProcessBatchQueueAsync(batches);

logger.Information("Finished processing all batches");

async Task ProcessBatchQueueAsync(IEnumerable<int> batchIds)
{
var semaphore = new SemaphoreSlim(3);
var tasks = new List<Task>();

foreach (var batchId in batchIds)
{
await semaphore.WaitAsync();

var task = Task.Run(async () =>
{
try
{
await ProcessSingleBatchAsync(batchId);
}
finally
{
semaphore.Release();
}
});

tasks.Add(task);
}

await Task.WhenAll(tasks);
}

async Task ProcessSingleBatchAsync(int batchId)
{
logger.Information("Processing batch {BatchId}...", batchId);
await Task.Delay(1000 + Random.Shared.Next(5000));
logger.Information("Finished processing batch {BatchId}", batchId);
}
all my code. As you can see this runs in a console app, and Im guessing yours is an asp.net backend?
Cryy
CryyOP•2mo ago
yeah, exactly .NET 4.7.2
Pobiega
Pobiega•2mo ago
oh, I'm running on .NET 9 4.7 is over 8 years old at this point as in, 4.7 got superceded 8 years ago.
Cryy
CryyOP•2mo ago
But I'm sure it's because of .NET version. Yeah, it is and we are are working on migration to newer framework but ... 🙂 But Im going to try it in separate console app too. It's a shame but I didn'd do it *I'm not sure it's because of .NET version
Pobiega
Pobiega•2mo ago
Yeah, try the standalone console app first
Cryy
CryyOP•2mo ago
Hm in standalone app it works for me too (.NET 5). It's weird
Pobiega
Pobiega•2mo ago
make sure you try with 4.7.2 .net 5 is on .net core, its not remotely the same as 4.7.2
Cryy
CryyOP•2mo ago
I tried it also for 4.7.2 but it works the same. Guess it somehow related to webapp. Maybe one more question. Is that Parallel foreach really that bad approach for my case ?
Pobiega
Pobiega•2mo ago
you'd want Parallell.ForEachAsync at the very least since the thing you are doing in each iteration is waiting for IO, which is async that said, you dont want to fire off thousands of http requests at the same time,
Cryy
CryyOP•2mo ago
I definitely dont want to fire thousands HTTP requests but I can limit them to 10 or something like that. These batches are about 2000 (these are daily batches so one for every day from about 2020, something like that).
Pobiega
Pobiega•2mo ago
as long as you still have some kind of semaphore or other limiting solution, I dont see why parallel.foreachasync wouldnt work fine
Want results from more Discord servers?
Add your server