C
C#3d ago
Core

✅ System.Threading.Channels - Consumer is blocking the main thread where the producer is

Hello, The producer writes to the channel with WriteAsync(), but the consumer blocks the termination of the producer. The outcome should be a non-blocking read, so the producer would complete in the meantime. The following is the consumer:
c#
while (await _reader.WaitToReadAsync())
{
var metadata = await _reader.ReadAsync();
}
c#
while (await _reader.WaitToReadAsync())
{
var metadata = await _reader.ReadAsync();
}
c#
public sealed class VisitMetadataChannel
{
private readonly Channel<VisitMetadata> _channel = Channel.CreateUnbounded<VisitMetadata>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});

public ChannelWriter<VisitMetadata> Writer => _channel.Writer;

public ChannelReader<VisitMetadata> Reader => _channel.Reader;
}
c#
public sealed class VisitMetadataChannel
{
private readonly Channel<VisitMetadata> _channel = Channel.CreateUnbounded<VisitMetadata>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});

public ChannelWriter<VisitMetadata> Writer => _channel.Writer;

public ChannelReader<VisitMetadata> Reader => _channel.Reader;
}
32 Replies
Sehra
Sehra3d ago
how is the consumer started?
Core
CoreOP3d ago
It's a quartz background job The quartz scheduler is running separately from the .net app
Sehra
Sehra3d ago
WaitToReadAsync will wait either for an item to be available (returns true) or that you do Writer.Complete(), when it returns false if you just want the consumer to process what is available and then exit, use while (Reader.TryRead(out var item)) {}
Core
CoreOP3d ago
The consumer must run until the app stops. The producer should just write and ignore the rest
Sehra
Sehra3d ago
so how would that block terminating the producer? normally you do Writer.Complete() to say no more data is coming, then await Reader.Completion to see when everything is processed
var channel = Channel.CreateUnbounded<int>();
_ = Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(await channel.Reader.ReadAsync());
}
});
await channel.Writer.WriteAsync(1);
await channel.Writer.WriteAsync(2);
await channel.Writer.WriteAsync(3);
channel.Writer.Complete();
await channel.Reader.Completion;
Console.WriteLine("done");
var channel = Channel.CreateUnbounded<int>();
_ = Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(await channel.Reader.ReadAsync());
}
});
await channel.Writer.WriteAsync(1);
await channel.Writer.WriteAsync(2);
await channel.Writer.WriteAsync(3);
channel.Writer.Complete();
await channel.Reader.Completion;
Console.WriteLine("done");
Core
CoreOP3d ago
I modified the code as follows:
c#
while (_reader.TryRead(out var metadata))
{
_logger.LogInformation("Hello");
}
c#
while (_reader.TryRead(out var metadata))
{
_logger.LogInformation("Hello");
}
c#
await _writer.WriteAsync(metadata, ct);
_writer.Complete();
c#
await _writer.WriteAsync(metadata, ct);
_writer.Complete();
The producer now termines. But the consumer never consumes the message
Sehra
Sehra3d ago
in that case the consumer will exit at any point the channel is empty just keep in mind you can't write more to the channel after you complete it
Core
CoreOP3d ago
write happens during an enpoint call I think it's clearer now, thank you I will try to do the modifications The Reader.WaitToReadAsync() causes the blocking. But I don't want that at all. Is a while(true) block with delay in it a good solution?
Sehra
Sehra3d ago
that's the whole idea, it will wait either for more data to be available, or no data to ever be available you have to explain how you want it to work
Core
CoreOP3d ago
The producer writes to the channel, and it does not care what happens, it needs no response, so it must not be blocked (fire & forget) The consumer reads from the channel and processes the data without blocking anyone. The consumer exists when the main application stops running
Sehra
Sehra3d ago
if you put the consumer in a background service, you can use the stoppingToken to complete the writer and wait for reader completion or just hook the lifetime events yourself, but hosted service is quite easy
Core
CoreOP3d ago
Do you refer to the built in backgroud service? IHostedService?
Sehra
Sehra3d ago
yeah, inherit from BackgroundService and it takes care of the lifetime events
Core
CoreOP3d ago
Yes it might be more optimal for a job like this, since it's lifespan is bound to the app's lifetime Many thanks, will try to implement it Would this mean spawning separate backgound services?
Sehra
Sehra3d ago
for what?
Core
CoreOP3d ago
the stopping token, does it only signals the caller that the execution was canceled?
Sehra
Sehra3d ago
in BackgroundService, it signals that the app is stopping
Core
CoreOP3d ago
no matter what combination of write, read method I try, the producer is always blocked
c#
while (!context.CancellationToken.IsCancellationRequested)
{
_reader.TryRead(out var metadata);

if (metadata is not null)
{
//handle
}
}
c#
while (!context.CancellationToken.IsCancellationRequested)
{
_reader.TryRead(out var metadata);

if (metadata is not null)
{
//handle
}
}
c#
_writer.TryWrite(metadata)
_writer.Complete();
c#
_writer.TryWrite(metadata)
_writer.Complete();
after Complete() is called, the consumer instanly executes, blocking the producer
Sehra
Sehra3d ago
that is bad code, it will spin as fast as it can
Core
CoreOP3d ago
I know, but the delay will not solve the blocking Why is the producer even blocked is beyond me, it should only write to the channel and call it a day
Sehra
Sehra3d ago
hard to say when you only show two lines
Core
CoreOP3d ago
Everything else is business logic, unrelated to the issue. There are 2 lines in the producer that are relevant. Once the TryWrite() is called, the TryRead() executes, and the consumer is blocked
c#
_writer.TryWrite(metadata);
_writer.Complete();
c#
_writer.TryWrite(metadata);
_writer.Complete();
c#
public async Task Execute(IJobExecutionContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
_reader.TryRead(out var metadata);

if (metadata is not null)
{
Console.WriteLine("");
}
}
}
c#
public async Task Execute(IJobExecutionContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
_reader.TryRead(out var metadata);

if (metadata is not null)
{
Console.WriteLine("");
}
}
}
Sehra
Sehra3d ago
but why are you running the consumer via Quartz? if you plan on running it periodically via Quartz, you can use while (_reader.TryRead(out var metadata)), but i don't think Quartz is a good fit for something that should run for the duration of the application. if so use a hosted service
Core
CoreOP3d ago
Sorry I am just dumb. It was working all along it's just in debug because there was no delay it automatically executed that part of the code.... :sadge: Thank you for having the patience for me.... :))
Sehra
Sehra3d ago
no worries, good that it's working
Core
CoreOP3d ago
$close
MODiX
MODiX3d ago
If you have no further questions, please use /close to mark the forum thread as answered
Unknown User
Unknown User3d ago
Message Not Public
Sign In & Join Server To View
MODiX
MODiX3d ago
see $channel
Unknown User
Unknown User3d ago
Message Not Public
Sign In & Join Server To View
Unknown User
Unknown User3d ago
Message Not Public
Sign In & Join Server To View

Did you find this page helpful?