C
C#13mo ago
Tim

✅ Cancel stream read

I'm running a Task that constantly reads some data off a (named pipe) stream when it's present.
while (_shouldRun)
{
// .. the below is in another method, but this is the idea
using (var reader = new BinaryReader(_receiveStream, encoding, true))
{
identifier = reader.ReadUInt16();
ushort lenght = reader.ReadUInt16();
data = reader.ReadBytes(lenght);
}
}
while (_shouldRun)
{
// .. the below is in another method, but this is the idea
using (var reader = new BinaryReader(_receiveStream, encoding, true))
{
identifier = reader.ReadUInt16();
ushort lenght = reader.ReadUInt16();
data = reader.ReadBytes(lenght);
}
}
the _shouldRun indicates whether the task should continue, I set this to false when it needs to stop The issue is that the read operation on the stream is infinitely blocking, so I'm never sure if my Task will end properly Can I stop a blocking stream read operation from another thread in some way?
82 Replies
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Fair enough, but how do I use BinaryReader with a CancellationToken? afaik that's not really a thing
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Oh, does that directly work with async/await? I thought it was a user-implemented thing, mb ?? As I read from other sources online it is a user implemented thing Since I cannot supply a CancellationToken to reader.ReadUInt16() or any of those methods, I can't implement it properly e.g. this answer tells that you should check if a cancellation was requested in the loop (manually) which would have the same effect as using a boolean, because any read operation would still block and the token not checked until the read operation finishes So the issue is that nothing in BinaryReader accepts a CancellationToken
MODiX
MODiX13mo ago
TeBeCo
you would have the CancellationToken which holds a boolean on the cancellation being requested or not
Quoted by
<@689473681302224947> from #Cancel stream read (click here)
React with ❌ to remove this embed.
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
so there is no non-blocking API similar to BinaryReader? I have a long-running task with a loop that constantly reads from a messaging stream and invokes an event when a message is ready for handling
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
how can I while using BinaryReader tho it has no methods accepting a cancellationtoken or anything
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
c#
// Receive the next message from the receiving stream.
public EncodedStreamMessage Receive()
{
CheckReceive();
_receiveStream.Flush();

ushort identifier;
byte[] data;

lock (_receiveLock)
{
// I almost didn't see the 'leaveOpen' parameter on the BinaryReader constructor...
// That would've been a nice bug to solve...
using (var reader = new BinaryReader(_receiveStream, encoding, true))
{
identifier = reader.ReadUInt16();
ushort lenght = reader.ReadUInt16();
data = reader.ReadBytes(lenght);
}
}

var message = new EncodedStreamMessage(identifier, data);
Received?.Invoke(message);
return message;
}
c#
// Receive the next message from the receiving stream.
public EncodedStreamMessage Receive()
{
CheckReceive();
_receiveStream.Flush();

ushort identifier;
byte[] data;

lock (_receiveLock)
{
// I almost didn't see the 'leaveOpen' parameter on the BinaryReader constructor...
// That would've been a nice bug to solve...
using (var reader = new BinaryReader(_receiveStream, encoding, true))
{
identifier = reader.ReadUInt16();
ushort lenght = reader.ReadUInt16();
data = reader.ReadBytes(lenght);
}
}

var message = new EncodedStreamMessage(identifier, data);
Received?.Invoke(message);
return message;
}
This is the method that blocks until a message is available
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
:\ hmm idk if I can with this
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
since they're pipestreams
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I'm not just gonna async everything I'm kinda new to C# But I don't think that's the right approach
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
yeah It'll just wait
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I didn't know how to solve that really
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I can work around BinaryReader here I guess
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
lol okay- yeah
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
MODiX
MODiX13mo ago
(Contains example for Socket TCP Echo) https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/ https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines if you're handling data from/to a stream, and dealing with intermediate buffer / parser / writer, you probably should consider using System.IO.Pipelines It's handling internally * temporary buffer * memory pool to re-use memory and avoid allocation * accumulate buffer until you decide you have enough data to do something (deserialize ?) There's dedicated extensions for Stream for specific use case like : (Contains a sample to read file / dump to console) Reading (eg: to a file):
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
Writing (eg: to Console):
var writer = PipeWriter.Create(Console.OpenStandardOutput(), new StreamPipeWriterOptions(leaveOpen: true));
var writer = PipeWriter.Create(Console.OpenStandardOutput(), new StreamPipeWriterOptions(leaveOpen: true));
Tim
TimOP13mo ago
No that's fine lol I'll work around binaryreader so if I use this, I would pass a ct as a parameter to my method, and then pass that same ct to the ReadAsync method?
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
hehe that's a bit primitive if you ask me but fair enough, it's a stream
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
ah yes the function won't return normally ofc okay reasonable ok so then
c#
private void Run()
{
while (_shouldRun)
{
try
{
Received?.Invoke(_protocol.Receive());
}
catch (Exception ex)
{
if (ReceivingError == null) throw;
ReceivingError(ex);
continue;
}
}
}
c#
private void Run()
{
while (_shouldRun)
{
try
{
Received?.Invoke(_protocol.Receive());
}
catch (Exception ex)
{
if (ReceivingError == null) throw;
ReceivingError(ex);
continue;
}
}
}
This is the action method that is run with a Task
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
So is it still a good idea to run it in a Task like that? With async await in that case so my run method would also be async
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
It's just Task.Run(RunAsync) then that accepts async methods as overload wait nvm Hmm what happens with the CancellationToken that can be passed to a task? Can I access that somehow in the running method without accessing an instance variable?
c#
private Task _receivingTask;
private CancellationTokenSource _tokenSource;

private async Task RunAsync(CancellationToken token)
{
while (true)
{
if (token.IsCancellationRequested) return;

var msg = await _protocol.Receive(token);
// TODO Catch cancel exception...
Received?.Invoke(msg);
}
}

public void Start()
{
if (_receivingTask != null) throw new InvalidOperationException("Already running");
_tokenSource = new CancellationTokenSource();
var token = _tokenSource.Token;
_receivingTask = new Task(async () => await RunAsync(token), token, TaskCreationOptions.LongRunning);
_receivingTask.Start();
}
c#
private Task _receivingTask;
private CancellationTokenSource _tokenSource;

private async Task RunAsync(CancellationToken token)
{
while (true)
{
if (token.IsCancellationRequested) return;

var msg = await _protocol.Receive(token);
// TODO Catch cancel exception...
Received?.Invoke(msg);
}
}

public void Start()
{
if (_receivingTask != null) throw new InvalidOperationException("Already running");
_tokenSource = new CancellationTokenSource();
var token = _tokenSource.Token;
_receivingTask = new Task(async () => await RunAsync(token), token, TaskCreationOptions.LongRunning);
_receivingTask.Start();
}
Currently I have this Which is a little messy with the cancellationTokenSource just floating around and being passed to the task Is there some way to improve that?
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
MODiX
MODiX13mo ago
TeBeCo
might be a Task.Run(async () => await RunAsync());
Quoted by
<@689473681302224947> from #Cancel stream read (click here)
React with ❌ to remove this embed.
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
mkay
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Hey, I implemented my reading logic like this:
c#
private static async Task<EncodedStreamMessage> CollectMessage(Stream stream, CancellationToken cancellation)
{
Console.WriteLine("Collecting message...");
byte[] buf = new byte[4]; // Create byte array to hold first two integers
await stream.ReadAsync(buf, 0, 4, cancellation); // Read first two integers
ushort identifier = BitConverter.ToUInt16(buf, 0); // Read identifier
ushort lenght = BitConverter.ToUInt16(buf, 2); // Read following lenght
Console.WriteLine("Collecting message data...");
await stream.ReadAsync(buf = new byte[lenght], 0, lenght, cancellation); // Read message data
Console.WriteLine("Collected message...");
return new EncodedStreamMessage(identifier, buf);
}
c#
private static async Task<EncodedStreamMessage> CollectMessage(Stream stream, CancellationToken cancellation)
{
Console.WriteLine("Collecting message...");
byte[] buf = new byte[4]; // Create byte array to hold first two integers
await stream.ReadAsync(buf, 0, 4, cancellation); // Read first two integers
ushort identifier = BitConverter.ToUInt16(buf, 0); // Read identifier
ushort lenght = BitConverter.ToUInt16(buf, 2); // Read following lenght
Console.WriteLine("Collecting message data...");
await stream.ReadAsync(buf = new byte[lenght], 0, lenght, cancellation); // Read message data
Console.WriteLine("Collected message...");
return new EncodedStreamMessage(identifier, buf);
}
It seems that triggering the cancellation does not immediately stop the ReadAsync action though
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
100% sure for named pipe stream? It only seems to trigger after closing the connected client I have a client that just connects and sends nothing I created a cancellationtoken with a timeout of 2 seconds I start receiving the message in a 'Loaded' (async) event handler from my WPF window
c#
private async void HandleLoaded(object sender, RoutedEventArgs e)
{
LoadPlayer();
Console.WriteLine("Test");
await _pipeStream.WaitForConnectionAsync();
Console.WriteLine("Test 2");

try
{
var msg = await _protocol.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(msg.Identifier);
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled");
}
catch (Exception ex)
{
Console.WriteLine("Other: " + ex.Message + " ... " + ex.ToString());
}

}
c#
private async void HandleLoaded(object sender, RoutedEventArgs e)
{
LoadPlayer();
Console.WriteLine("Test");
await _pipeStream.WaitForConnectionAsync();
Console.WriteLine("Test 2");

try
{
var msg = await _protocol.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(msg.Identifier);
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled");
}
catch (Exception ex)
{
Console.WriteLine("Other: " + ex.Message + " ... " + ex.ToString());
}

}
Test 2
Receiving...
Collecting message...
[NORMAL]Discovering patches...
[NORMAL]No patches in this directory...
[NORMAL]Connecting...
[GOOD]Connected
The thread 0x2f90 has exited with code 0 (0x0).
The program '[8928] Schilt_Visualization_TestHMI.exe: Program Trace' has exited with code 0 (0x0).
The program '[8928] Schilt_Visualization_TestHMI.exe' has exited with code 0 (0x0).
Test 2
Receiving...
Collecting message...
[NORMAL]Discovering patches...
[NORMAL]No patches in this directory...
[NORMAL]Connecting...
[GOOD]Connected
The thread 0x2f90 has exited with code 0 (0x0).
The program '[8928] Schilt_Visualization_TestHMI.exe: Program Trace' has exited with code 0 (0x0).
The program '[8928] Schilt_Visualization_TestHMI.exe' has exited with code 0 (0x0).
The prefixed ([NORMAL] and such) log entries are from my client application which runs as child When I close the app normally, after waiting for a bit for the 2 seconds to pass, it doesn't throw at all When I close my client, which disconnects the pipe client, it runs until the second ReadAsync and then cancels it prints Collecting message data... and then throws
c#
public async Task<EncodedStreamMessage> Receive(TimeSpan timeout)
{
using (var tokenSource = new CancellationTokenSource(timeout))
return await Receive(tokenSource.Token);
}

public async Task<EncodedStreamMessage> Receive(CancellationToken cancellation)
{
CheckReceive();
Console.WriteLine("Receiving...");
var message = await CollectMessage(_receiveStream, cancellation);
Received?.Invoke(message);
return message;
}
c#
public async Task<EncodedStreamMessage> Receive(TimeSpan timeout)
{
using (var tokenSource = new CancellationTokenSource(timeout))
return await Receive(tokenSource.Token);
}

public async Task<EncodedStreamMessage> Receive(CancellationToken cancellation)
{
CheckReceive();
Console.WriteLine("Receiving...");
var message = await CollectMessage(_receiveStream, cancellation);
Received?.Invoke(message);
return message;
}
intermediary receive methods that eventually call the CollectMessage method Fuck Hold on Heh? nvm
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I though I made a mistake because I forgot to set the pipe to async with PipeOptions.Asynchronous but it still doesn't work this is my pipe server new NamedPipeServerStream(Constants.PipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); I'm starting to believe that PipeStream just doesn't support async reading but it does say 'and monitors cancellation requests'
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
no worries, I'm patient ^-^ Also I think it doesn't actually support cancelling while reading
Tim
TimOP13mo ago
From looking at the source of stream you can see that the token doesn't get passed to the read operation at all, only checked before reading:
No description
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
This is from the Stream class and PipeStream doesn't override it
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
neither overrides it NamedPipeServerStream extends PipeStream extends Stream so my stream uses that same implementation from what I can see meaning the cancellation token just gets ignored
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
wooooh I hadn't seen this one
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I removed the new task usage yeah maybe they forgot the Task.Start
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
that's an event handler... oh well nvm that was an event handler
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I moved it to a method so I can adjust this yeah
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Why heh
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
right, I'll fix that
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
fair but that's a bit complex yeah
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
what I have here is just a testing HMI for my communication code which will be implemented better in the actual product but ok that's not the cause of my issue right hmm okay the readpipeasync might fix it with the Task.Run ofc Doesn't work doesn't throw after 2 seconds
c#
private static Task<int> ReadPipeAsync(PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var result = pipe.BeginRead(buffer, offset, count, null, null);
Console.WriteLine("Reading pipe async");
return Task.Run(() =>
{
try { return pipe.EndRead(result); }
finally { registration.Dispose(); }
}, cancellationToken);
}
c#
private static Task<int> ReadPipeAsync(PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var result = pipe.BeginRead(buffer, offset, count, null, null);
Console.WriteLine("Reading pipe async");
return Task.Run(() =>
{
try { return pipe.EndRead(result); }
finally { registration.Dispose(); }
}, cancellationToken);
}
implemented like this It does get to the debug message And then fails to ever end the read Ok, I have it working now Using the CancelIoEx method instead from the answer below It cancels correctly after 2 seconds and the OperationCancelledException gets thrown as unhandled in user code Follow up question, why does it say unhandled when I'm catching it?
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
I did in my even handler
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Yes, I'm logging something when it catches:
c#
try
{
var msg = await _protocol.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(msg.Identifier);
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled");
}
c#
try
{
var msg = await _protocol.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(msg.Identifier);
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled");
}
But before that it shows up as unhandled
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Test

Test 2
Collecting message...
Reading pipe async
[NORMAL]Discovering patches...
[NORMAL]No patches in this directory...
[NORMAL]Connecting...
[GOOD]Connected
Exception thrown: 'System.OperationCanceledException' in System.Core.dll
An exception of type 'System.OperationCanceledException' occurred in System.Core.dll but was not handled in user code
The operation was canceled.

Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Cancelled
Test

Test 2
Collecting message...
Reading pipe async
[NORMAL]Discovering patches...
[NORMAL]No patches in this directory...
[NORMAL]Connecting...
[GOOD]Connected
Exception thrown: 'System.OperationCanceledException' in System.Core.dll
An exception of type 'System.OperationCanceledException' occurred in System.Core.dll but was not handled in user code
The operation was canceled.

Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Cancelled
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Yeah I don't have to pass it
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
Tim
TimOP13mo ago
I think I'm printing some detail.. But it shouldn't say exception unhandled right? Or is that because it goes through the Task binary
Unknown User
Unknown User13mo ago
Message Not Public
Sign In & Join Server To View
Tim
TimOP13mo ago
aight, you've helped a lot already 😅, tysm! It's working now + you gave great feedback on some bad habits Good luck with work, I'll figure it out

Did you find this page helpful?