C
C#2y ago
Cracker

❔ ✅ How to implement pool tracking in concurrent list ?

I am trying to add tracking to my logging service. (Web API project) Once track count is matched, I disbound the ConcurrentBag and send list to logger service. I need review in this approach where I don't want to lose log data since this method is called in every request.
private readonly ConcurrentBag<object> _objects = new ConcurrentBag<object>();

public void Log<T>(T data) where T : class
{
try
{
_objects.Add(data);
if (_objects.Count >= _logOption.TrackCount)
{
Task.Factory.StartNew(async () =>
{
var dataList = new List<object>();
for (int i = 0; i < _logOption.TrackCount; i++)
{
if (_objects.TryTake(out object item))

{
dataList.Add(item);
}
}
await SendToKafkaApi(dataList);
});
}
}
catch (Exception ex)
{
LogOnFile(ex);
}
}
private readonly ConcurrentBag<object> _objects = new ConcurrentBag<object>();

public void Log<T>(T data) where T : class
{
try
{
_objects.Add(data);
if (_objects.Count >= _logOption.TrackCount)
{
Task.Factory.StartNew(async () =>
{
var dataList = new List<object>();
for (int i = 0; i < _logOption.TrackCount; i++)
{
if (_objects.TryTake(out object item))

{
dataList.Add(item);
}
}
await SendToKafkaApi(dataList);
});
}
}
catch (Exception ex)
{
LogOnFile(ex);
}
}
32 Replies
Tvde1
Tvde12y ago
Your goal is to batch requests to send to Kafka? And of course don't lose or duplicate any data
Cracker
CrackerOP2y ago
Yes correct, I am more confused about tracking and pooling
Tvde1
Tvde12y ago
A potential issue I can forsee is if two requests come in, they both start a new task to take items out of the DataList. Still that will result in multiple batches, not necessarely in duplicate or missing data
Cracker
CrackerOP2y ago
TryTake gurantees the no duplicate ?
Tvde1
Tvde12y ago
I would guess the TryTake is atomic The docs don't say much, so let's peek into the source code! https://source.dot.net/#System.Collections.Concurrent/System/Collections/Concurrent/ConcurrentBag.cs,ac8fb3381a7110eb okay never mind 😅
Cracker
CrackerOP2y ago
It works with CurrentThread which I guess euqals to using lock so, I believe its thread safe and no duplicates
Tvde1
Tvde12y ago
I'm pretty sure it's safe 👍
Cracker
CrackerOP2y ago
I am still open for review if anything I misuse here
Tvde1
Tvde12y ago
What's the reason you're starting a new task? If the SendToKafkaApifails, will you catch or retry it? and I think the recommended way to start tasks like this is by Task.Run(async () =>
Cracker
CrackerOP2y ago
There was CPU bound operation inside SendToKafkaApi and simply not awaiting it didn't solve the issue and it affected response time CPU bound work like you can think it behave Thrad.Sleep in some point its solved anyways but still
Tvde1
Tvde12y ago
I think I would store a reference to the created task, and don't start a new one until the existing one completes Because now if 100 requests would come in before the first one starts TryTake'ing, there will be 100 tasks
Cracker
CrackerOP2y ago
and TrackCount keeps increaseing while other Task is Trying to Take, thats another issue
Tvde1
Tvde12y ago
What is the purpose of the Try-Catch? It will not catch any exceptions that happen inside the Task
Cracker
CrackerOP2y ago
correct, its not needed
Tvde1
Tvde12y ago
TrackCount doesn't update as far as I can see. Even if there is 5 items in the Bag and TrackCount is 50, the 45 extra iterations are harmless and won't do anything I'm not sure how I would approach this. I would probably use Rx and create a Subject<T> which this middleware will push items into. Then using the Rx batching things, I'd create batches and send those batches somewhere
Cracker
CrackerOP2y ago
what do you mean by Rx ?
Tvde1
Tvde12y ago
The NuGet package Reactive Extensions https://github.com/dotnet/reactive
GitHub
GitHub - dotnet/reactive: The Reactive Extensions for .NET
The Reactive Extensions for .NET. Contribute to dotnet/reactive development by creating an account on GitHub.
Tvde1
Tvde12y ago
the concepts are documented in https://reactivex.io/ and there are implementations for many languages you can have subjects which stream data and you can subscribe to them to receive data more modern than event in C#
Cracker
CrackerOP2y ago
aka observer pattern as they mentioned
Tvde1
Tvde12y ago
Subject<User> NewUsers = new();

NewUsers.Subscribe(newUser => Console.WriteLine($"New user {newUser.Name} was added!"));
NewUsers.Subscribe(newUser => _database.SaveUser(newUser));
NewUsers.Subscribe(newUser => _emailService.SendOnboardingEmail(newUser.EmailAddress));

NewUsers.OnNext(new User { ... });
NewUsers.OnNext(new User { ... });
Subject<User> NewUsers = new();

NewUsers.Subscribe(newUser => Console.WriteLine($"New user {newUser.Name} was added!"));
NewUsers.Subscribe(newUser => _database.SaveUser(newUser));
NewUsers.Subscribe(newUser => _emailService.SendOnboardingEmail(newUser.EmailAddress));

NewUsers.OnNext(new User { ... });
NewUsers.OnNext(new User { ... });
this is a kind of basic example, but you can set up subscribes for a subject, and they will each get their callback executed when data is added to the subject or you can do
NewUsers.Where(user => user.Name.StartsWith("A"))
.Buffer(5)
.Subscribe(newUsers => _emailService.SendEmailsToNewUsersWithA(newUsers.ToList());
NewUsers.Where(user => user.Name.StartsWith("A"))
.Buffer(5)
.Subscribe(newUsers => _emailService.SendEmailsToNewUsersWithA(newUsers.ToList());
there is a lot of helper methods this one would buffer every 5 emails so we send batch emails, but you can also set a time limit Woah this one is really interesting https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2!
Cracker
CrackerOP2y ago
We need to run load test and some extra controls after adding packages to core operations. I will try to solve this with simple approach or use existing observer impalmentation in existing codebase
Tvde1
Tvde12y ago
Discuss with your team when you want to add packages like this. It can save you a lot of time and do work for you, but it also takes time to learn :)
Cracker
CrackerOP2y ago
yep 😄
Tvde1
Tvde12y ago
for your example, I'd look into what happens if SendToKafkaApi fails (e.g. throws an error) Do you want to retry the call or put it back into the Bag?
Cracker
CrackerOP2y ago
no SendToKafkaApi handles exceptions within, either log on file or retry
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>();

public async Task Log<T>(T data) where T : class
{
_tasks.Add(SendToKafkaApi<T>(data));
if (_tasks.Count >= _logOption.TrackCount)
{
var list = _tasks.ToList();
_tasks.Clear();

await Task.WhenAll(list);
}
}
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>();

public async Task Log<T>(T data) where T : class
{
_tasks.Add(SendToKafkaApi<T>(data));
if (_tasks.Count >= _logOption.TrackCount)
{
var list = _tasks.ToList();
_tasks.Clear();

await Task.WhenAll(list);
}
}
@cuddlevde1 how does it look like ? Simpler
Tvde1
Tvde12y ago
1. Making a lot of tasks adds a lot of resources compared to saving a list of items & starting one task to send all items. 2. Inbetween _tasks.ToList() and _tasks.Clear(), a new items can be put in the Bag, which is then deleted I would stick to having a list of items and sending a batch once in a while
Cracker
CrackerOP2y ago
yeah makes sense, not simpler 🙂
Cracker
CrackerOP2y ago
I end up using this block
Tvde1
Tvde12y ago
Hmm did you trigger this @Atakan / Cracker or did something else close it?
Cracker
CrackerOP2y ago
I did, force of habit. Removing idle tasks 😄
Tvde1
Tvde12y ago
no problem!
Accord
Accord2y ago
Looks like nothing has happened here. I will mark this as stale and this post will be archived until there is new activity.

Did you find this page helpful?