C
C#16mo ago
WarChortle

❔ Trying to maximize the amount of messages my worker can process while using Channels.

I am working with AWS SQS queue and using a channel to write messages to and read from. This is how I am writing them
while (!cancellationToken.IsCancellationRequested)
{
if (await _channelWriter.WaitToWriteAsync(cancellationToken))
{
var messages = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest()
{
MaxNumberOfMessages = 10,
QueueUrl = "https://sqs.us-east-1.amazonaws.com/563122478357/TestQueue"
}, cancellationToken);

foreach (var message in messages.Messages)
{
await _channelWriter.WriteAsync(message.Body, cancellationToken);
}
}
}
while (!cancellationToken.IsCancellationRequested)
{
if (await _channelWriter.WaitToWriteAsync(cancellationToken))
{
var messages = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest()
{
MaxNumberOfMessages = 10,
QueueUrl = "https://sqs.us-east-1.amazonaws.com/563122478357/TestQueue"
}, cancellationToken);

foreach (var message in messages.Messages)
{
await _channelWriter.WriteAsync(message.Body, cancellationToken);
}
}
}
and read them
while (!stoppingToken.IsCancellationRequested)
{
//returns when a message exists to read from
await _channelReader.WaitToReadAsync(stoppingToken);

var message = await _channelReader.ReadAsync(stoppingToken);
Console.WriteLine(message);
}
while (!stoppingToken.IsCancellationRequested)
{
//returns when a message exists to read from
await _channelReader.WaitToReadAsync(stoppingToken);

var message = await _channelReader.ReadAsync(stoppingToken);
Console.WriteLine(message);
}
My question is how will async work here. I am trying to maximize the concurrent execution of messages. Will C# just handle it for me, or do I have to do something to prevent a bunch of threads reading and 1 thread writing(causing starvation). Or vice versa?
1 Reply
Accord
Accord16mo ago
Looks like nothing has happened here. I will mark this as stale and this post will be archived until there is new activity.

Did you find this page helpful?