Help with writing a command queueing system for a serial device
Hi there, I'm currently writing a library for an old IO device which operates over an RS485 serial connection.
The way it accepts commands is it takes an array of request packets in binary:
Then the board will respond like this, also in binary:
Currently I have a concept in my head of how this would work, ideally I would like to be able to queue up a bunch of request classes (Which I will refer to herein as TRequest), then call a ProcessCommands function which sends these to the IO board, receives a response, and then I can de-queue response classes that are a match for the TRequests passed to the function with the response data.
My question is, how would I be best to go about this, I've currently tried something with generic types however am struggling to do what I want with it. Heres what I have currently:
Please see attached cs file, the code is too large to write into a discord message
CommandPayload is an abstract class containing a command code as well as an abstract Serialise() function which serialises from its own parameters, and Report is an abstract class containing a status code along with a Deserialise() function which deserialises to its own paramters
Am I going the right way about this? Please note the Enqueue line, where im really not sure how to get back to the Report superclass after the ChangeType (If I can even do a ChangeType like this?)
I'm really hoping this makes sense, anything that isnt clear/is awful please do ask. Any assistance would be greatly appreciated :)
58 Replies
$code
To post C# code type the following:
```cs
// code here
```
Get an example by typing
$codegif
in chat
For longer snippets, use: https://paste.mod.gg/Also iirc serialport class only works with rs232 devices, so for other serial port devices you need a hardware adapter to go from rs232 to rs485. You can find inexpensive adapters. Depending on the version of .net System.IO.Ports.SerialPort is included or you need the nuget package https://www.nuget.org/packages/System.IO.Ports. realistically to communicate with a serial device you'll need to know the protocol, i.e baud rate, start / stop bit patterns, port numbers, etc. I don't normally open attachments on discord, if you want to paste code.
As for the command queuing system you can likely just use mediatr. https://github.com/jbogard/MediatR
GitHub
GitHub - jbogard/MediatR: Simple, unambitious mediator implementati...
Simple, unambitious mediator implementation in .NET - jbogard/MediatR
I haven't used the serial port class for some time. The last time was with Boca ticket printers.
Some things may have changed as of supported devices.
I have an rs232 to 485 adapter sitting inbetween the rs232 port and the rs485 device so this is a non issue
Will have a look at mediatr though thanks!
is this not async?
why the
Convert.ChangeType
?
does packets have bom/eom or are they fixed size messages?The only real issue should be sending the proper data across the port, which the docs can describe how better than me, it's fairly straightforward to send data.
I'm pretty swamped these days, but if you hit trouble just post code or throw it in a repo and I'll take a look.
It's usually device specific and the protocols are well defined. Some devices share protocols like modbus, MIDI, etc. Some are very custom. Years ago I needed to send data to a robotic glass cutting knife and table which iirc used modbus. I've had to work with thermal printers a few times that were way off book.
Packet response size is dependant on the request type
The deserialiser is expected to know the length of the response segments for each command based on the command in question
Yeah awesome, it is a defined spec for arcade machine boards, my project is to allow one to be translated to keyboard inputs for use with non arcade games
Also as I explained above, I'm not entirely sure if change type is suitable
It's so I can put all the commands into the same queue in the program without generic type mismatches
Because each command has its own request/response type which inherits base abstract class Command/Report
These define an abstract serialise/deserialise function which must be implemented by the super class and a few required properties that must be overridden (for command that's the command code and for report it's a report (basically if the command was successful or failed) byte
All the data in the super class is request dependant
Also no currently not async, I might change it to be in the future but for the current task I want to use the library for (input translation) it doesn't need to be async because it's only doing one thing in a console app
So it doesn't matter if the entire app is held up whilst communicating
Mediatr will handle that part
Well the typing anyway
yes, i've worked with different devices too, and that's why i would not make the things the way they are described, for example a command with payload and report, because some devices can execute commands in parallel (so you have the ability to send them in batch), some devices have commands that return values and some that doesn't, some return more than one value in sequential steps requiring a more articulate logic, and also the serialize part i would leave it in its own protocol class, because in some cases it changes depending on the physical layer (serial, telnet, usb, tcp, websocket, http, even ssh)
last one i've been working on is a chinese conference system: raw bytes, bad docs, not that much fun
mainly i would probably avoid Convert methods and try to move the most conversion steps possible to a logic that gives compile time errors instead of runtime ones
Yeah I did end up moving away from it, I think ive managed to achieve what I wanted to thanks all :D
I shall post code examples:
Is used by doing this
I wanted to avoid a library if possible it seems a bit heavy handed for what will be a rather small library
It could do with a bit of cleaning/edge case handling but this works
Lastly the actual commandgroup function, theres a seperate QueueCommand without a TRep for commands which do not provide a response (Its also what the while loop is for in DequeueCommand to skip over reading commands that do not get responded to)
They do have to be de-queued in the same order, will throw a cast exception if its not, again room for improvement
So I would probably write this by using Pipes.
Where each outbound and inbound sequence is assigned an int. Which serves as an index into an array to find a TaskCompletionSource to resume.
Not really a need for a queue you handle yourself.
I see, I think writing this asynchronously will cause more issues than it will solve though
Naw. TaskCompletionSource imo makes it easier.
Don't have to figure out how to attach responses to requests.
The protocol is not designed for asynchronous operation with multiple IO boards, the IO boards do not identify themselves upon response, and if you send a command to 2 different IO boards they may attempt to send a response on the bus at the same time and clash, and theres no way of differenciating them
From that perspective I would have to research into it
Sorry I saw Task and assumed its something asynchronous 😅
It would be.
I'm just saying TCS makes it easier to write, in whole, than even trying to do it synchronously.
I can implement this asynchonously, but it would have to block any further executions of ProcessCommand until the IO board has responsed
But yeah will have a look into it thanks
Coupled with Pipes, which makes it easy to process input in batches, without being concerned about buffers.
You dont mean named pipes do you?
No.
Have you got a docs link?
System.IO.Pipelines - .NET
Learn how to efficiently use I/O pipelines in .NET and avoid problems in your code.
So you're working with a serial connection. So a forward only stream of bytes. A single Read operation is unaware of how your device or you frames messages. In the case you posted, you have a comma.
Right got it, and I can just do all my serial port reads to fill the pipe with data?
So, a single Read operation may return a half message.
Oh just note with the example, the commas are there to make it a bit clearer
Theyre not actually in the protocol
I should have clarified a bit
Ahh, well, whatever you use to signify the end of a command.
Read is unaware of that.
The report code for the next command trails after the last byte of the current
So reading from the device into a byte[] or whatever you are doing may return a partial response.
Which you usually deal with by writing a state machine of some fashion. Where you Read over and over again, until you process at least one full 'packet'.
Then you return the packet to the caller. But, you might still have half of the next packet. So you keep that in your buffer. Wait for more data, until you again have a full packet.
There's a lot of state management that goes into that. Reading into buffers. Copying into a cache buffer. Then appending buffer with next data. Etc
Pipelines does that all for you.
Right I think I get it, the current code actually isnt using a cache buffer, it processes the data as its received using ReadByte
Yeah, so a state machine.
After you read a byte, you check some variables to determine what you expect that byte to be.
Is it supposed to be a status code? Is it supposed to be the next packet? etc.
The Pipelines maintains it's own sequence of buffers. It calls your code. You report back how much of that buffer you 'consumed'. It uses the number you report back to slice and append buffers. So, it sends you some data, you attempt to parse it, then report back the number of bytes you consumed. And it calls your code a second time, after new data comes in, but gives you the remaining bytes as well, so you can treat each call as an attempt to parse a whole packet.
Which makes your code a lot simpler: bool TryParsePacket(ReadOnlySequenc<byte> data, out int consumed);
If it can only hand you half of a packet, you just return false. And then it waits for more data, before calling your code a second time with the full packet.
Right, so a bit like a stream reader and you just tell it how many bytes you read from said stream?
Sorta. The thing with Streams is you can't tell it to wait until you receive N number of bytes.
A Stream.Read takes the maximum number of bytes to read. But it may give you less if it has less.
Right I think I get the concept of it now, so in this case, each "frame" of a packet gives an address and payload length, I assume I can use Pipelines to tell it to wait until the length amount of bytes is received and then process them
Where as I get what you mean with streams, you can request so many bytes but the stream doesnt necessarily return that many bytes
And will only return what has been sent through the serial port at the time of the stream read
Well it's undefined really what it will give you but yes. The OS and drivers are all involved.
It might have more data than it gives you in a single call. But because of how the driver buffered it in memory it may be split.
I see, but you just tell it how much of the data you processed, and on the next call it includes the unprocessed data plus any unprocessed data that was received inbetween?
Its also less copies. What it gives you is a ReadOnlySequence.
Yup
Dunno if that'll work with SerialPort on .NET though.
But in the ideal case, the ReadOnlySequence will be a list of Spans directly from the OS buffer.
Not a copied .NET array.
SerialPort has an underlying stream, I dont know if that can be passed to Pipeline to work with?
It can. It's been ages since I looked at SerialPort. Looking.
Yeah so it has no read into Span, so that's unfortunate. But Read() to feed the PipeWriter would be better than the Stream.
So for Pipelines you have two parts. A PipeReader and a PipeWriter. One side fills the PipeWriter. The other loops over the PipeReader.
This is what your writer code would look like sorta:
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
That just loops forever, until in that sample's case, the socket is closed. Which is zero bytes read.
But notice how it asks the Writer for Memory.
The Pipeline maintains a buffer pool for you. So you are asking it for memory to write into. And it gives you a buffer. Which you then report back. And that buffer you get might just be a sliced array that's reused.
Then on the reading side:
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
TryReadLine is your... I dunno, TryReadCommand or something.
Right, so TryReadLine would be where an entire command read is attempted
Yup.
And when it has everything for one command, it goes to ProcessLine
etc etc until end of packet
Yeah. So three bits of code. BIt one is just to pump data as it comes into the writer. Bit two just attempts to parse commands until the end. And then your TryParseCommand, which can be very simple: it can assume it's at the start of a command.
And you start it up like this:
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
Right, I get that from a response POV now, just need to figure out how TaskCompletionSource links into this
TaskCompletionSource is what allows you to create a Task, and finish it when something happens.
Since the board does not return any command codes, it expects the client to know what commands it issued in what order
Say you have Task SendCommand(). The point of this code would be to queue up some data to be sent to the device, and return a Task to the user which is completed when the future response arrives.
Since you need to basically 1:1 map commands to responses, you'll need to maintain that yourself. As commands come in, you give them an ID. As responses come in, you incremenet a counter.
The counter represents the ID of the original command.
So now you know which task to complete.
OH right
So between them just a simple ID of what command links to what
var tcs = new TaskCompletionSource<TCommandResponse>();
// queue command
return tcs.Task;
And then response Pipeline can reference that to figure out what its decoding
That's what your SendCommand method looks like basically.
where "queue command" is inserting the TCS into an int-indexed list of some kind.
In your ProcessResponse method, which is run by the Reading loop, you can look up the associated original TCS and complete it.
var tcs = outstanding[theid];
tcs.SetComplete(parsedResponse);
That completes the original task that you had handed out.
(you'll have to manage that list yourself, removing finished items, etc)
I'd frankly just start it with a Dictionary<int, TaskCompletionSource>(); probaby.
But I'm sure there's a better structure there. :)
Well, probably a ConcurrentDictionary<int, TaskCompletionSource<IResponse>>()
Right I see, and I can just refer to that dictionary in order to grab the response class I need to deserialise into
Becauase the Send will likely happen on a thread different than the Reader thread.
And after all that, you'll be able to send commands like: var response = await devicething.SendCommand(new Command())
Right I see, ima need a little time to read back through this and try make an implementation, this has been really helpful though thanks :)
This is all stuff I didnt realise existed
np
No docs is terrible
With anything serial
"docs" in this case is an excel file, just saying
At one job we were working on a ticketing system and needed to interact with Ticketmaster over a serial interface. Even though we needed to interface with them, and the project would fail on both sides if we didn't, we'd still have to buy their docs for some unreasonable amount. They said we were free to reverse engineer them. That was fun.