C
C#•6d ago
derole

Help with writing a command queueing system for a serial device

Hi there, I'm currently writing a library for an old IO device which operates over an RS485 serial connection. The way it accepts commands is it takes an array of request packets in binary:
<Command 1 code>,<Command 1 parameter bytes>,<Command 2 code>,<Command 2 parameter bytes>...
<Command 1 code>,<Command 1 parameter bytes>,<Command 2 code>,<Command 2 parameter bytes>...
Then the board will respond like this, also in binary:
<Status of command 1>,<Response bytes for command 1>,<Status of command 2>,<Response bytes for command 2>...
<Status of command 1>,<Response bytes for command 1>,<Status of command 2>,<Response bytes for command 2>...
Currently I have a concept in my head of how this would work, ideally I would like to be able to queue up a bunch of request classes (Which I will refer to herein as TRequest), then call a ProcessCommands function which sends these to the IO board, receives a response, and then I can de-queue response classes that are a match for the TRequests passed to the function with the response data. My question is, how would I be best to go about this, I've currently tried something with generic types however am struggling to do what I want with it. Heres what I have currently: Please see attached cs file, the code is too large to write into a discord message CommandPayload is an abstract class containing a command code as well as an abstract Serialise() function which serialises from its own parameters, and Report is an abstract class containing a status code along with a Deserialise() function which deserialises to its own paramters Am I going the right way about this? Please note the Enqueue line, where im really not sure how to get back to the Report superclass after the ChangeType (If I can even do a ChangeType like this?) I'm really hoping this makes sense, anything that isnt clear/is awful please do ask. Any assistance would be greatly appreciated :)
58 Replies
Mayor McCheese
Mayor McCheese•6d ago
$code
MODiX
MODiX•6d ago
To post C# code type the following: ```cs // code here ``` Get an example by typing $codegif in chat For longer snippets, use: https://paste.mod.gg/
Mayor McCheese
Mayor McCheese•6d ago
Also iirc serialport class only works with rs232 devices, so for other serial port devices you need a hardware adapter to go from rs232 to rs485. You can find inexpensive adapters. Depending on the version of .net System.IO.Ports.SerialPort is included or you need the nuget package https://www.nuget.org/packages/System.IO.Ports. realistically to communicate with a serial device you'll need to know the protocol, i.e baud rate, start / stop bit patterns, port numbers, etc. I don't normally open attachments on discord, if you want to paste code. As for the command queuing system you can likely just use mediatr. https://github.com/jbogard/MediatR
GitHub
GitHub - jbogard/MediatR: Simple, unambitious mediator implementati...
Simple, unambitious mediator implementation in .NET - jbogard/MediatR
Mayor McCheese
Mayor McCheese•6d ago
I haven't used the serial port class for some time. The last time was with Boca ticket printers. Some things may have changed as of supported devices.
derole
deroleOP•6d ago
I have an rs232 to 485 adapter sitting inbetween the rs232 port and the rs485 device so this is a non issue Will have a look at mediatr though thanks!
this_is_pain
this_is_pain•6d ago
is this not async? why the Convert.ChangeType? does packets have bom/eom or are they fixed size messages?
Mayor McCheese
Mayor McCheese•6d ago
The only real issue should be sending the proper data across the port, which the docs can describe how better than me, it's fairly straightforward to send data. I'm pretty swamped these days, but if you hit trouble just post code or throw it in a repo and I'll take a look. It's usually device specific and the protocols are well defined. Some devices share protocols like modbus, MIDI, etc. Some are very custom. Years ago I needed to send data to a robotic glass cutting knife and table which iirc used modbus. I've had to work with thermal printers a few times that were way off book.
derole
deroleOP•6d ago
Packet response size is dependant on the request type The deserialiser is expected to know the length of the response segments for each command based on the command in question Yeah awesome, it is a defined spec for arcade machine boards, my project is to allow one to be translated to keyboard inputs for use with non arcade games Also as I explained above, I'm not entirely sure if change type is suitable It's so I can put all the commands into the same queue in the program without generic type mismatches Because each command has its own request/response type which inherits base abstract class Command/Report These define an abstract serialise/deserialise function which must be implemented by the super class and a few required properties that must be overridden (for command that's the command code and for report it's a report (basically if the command was successful or failed) byte All the data in the super class is request dependant Also no currently not async, I might change it to be in the future but for the current task I want to use the library for (input translation) it doesn't need to be async because it's only doing one thing in a console app So it doesn't matter if the entire app is held up whilst communicating
Mayor McCheese
Mayor McCheese•6d ago
Mediatr will handle that part Well the typing anyway
this_is_pain
this_is_pain•6d ago
yes, i've worked with different devices too, and that's why i would not make the things the way they are described, for example a command with payload and report, because some devices can execute commands in parallel (so you have the ability to send them in batch), some devices have commands that return values and some that doesn't, some return more than one value in sequential steps requiring a more articulate logic, and also the serialize part i would leave it in its own protocol class, because in some cases it changes depending on the physical layer (serial, telnet, usb, tcp, websocket, http, even ssh) last one i've been working on is a chinese conference system: raw bytes, bad docs, not that much fun mainly i would probably avoid Convert methods and try to move the most conversion steps possible to a logic that gives compile time errors instead of runtime ones
derole
deroleOP•6d ago
Yeah I did end up moving away from it, I think ive managed to achieve what I wanted to thanks all :D I shall post code examples:
private JVSPort _port;
private Queue<JVSCommandGroup> _commandQueue = new Queue<JVSCommandGroup>();

public void QueueCommand<TCmd, TRep>(TCmd command)
where TCmd : JVSCommand
where TRep : JVSReport, new()
{
var jvsCmd = new JVSCommandGroup
{
Command = command,
Report = new TRep()
};
_commandQueue.Enqueue(jvsCmd);
}
public void QueueCommand<TCmd>(TCmd command)
where TCmd : JVSCommand
{
var jvsCmd = new JVSCommandGroup
{
Command = command
};
_commandQueue.Enqueue(jvsCmd);
}

public void ProcessCommands()
{
_port.SendCommands(Address, _commandQueue);
}

public TRep DequeueCommand<TRep>()
where TRep : JVSReport
{
var cmd = _commandQueue.Dequeue();
while (cmd.Report == null)
cmd = _commandQueue.Dequeue();
return (TRep)cmd.Report;
}
private JVSPort _port;
private Queue<JVSCommandGroup> _commandQueue = new Queue<JVSCommandGroup>();

public void QueueCommand<TCmd, TRep>(TCmd command)
where TCmd : JVSCommand
where TRep : JVSReport, new()
{
var jvsCmd = new JVSCommandGroup
{
Command = command,
Report = new TRep()
};
_commandQueue.Enqueue(jvsCmd);
}
public void QueueCommand<TCmd>(TCmd command)
where TCmd : JVSCommand
{
var jvsCmd = new JVSCommandGroup
{
Command = command
};
_commandQueue.Enqueue(jvsCmd);
}

public void ProcessCommands()
{
_port.SendCommands(Address, _commandQueue);
}

public TRep DequeueCommand<TRep>()
where TRep : JVSReport
{
var cmd = _commandQueue.Dequeue();
while (cmd.Report == null)
cmd = _commandQueue.Dequeue();
return (TRep)cmd.Report;
}
Is used by doing this
public void GetInfo()
{
QueueCommand<JVSCommand, IOIdentifyResponse>(new JVSCommand(0x10));
QueueCommand<JVSCommand, CommandRevisionResponse>(new JVSCommand(0x11));
QueueCommand<JVSCommand, JVSRevisionResponse>(new JVSCommand(0x12));
QueueCommand<JVSCommand, CommunicationsVersionResponse>(new JVSCommand(0x13));
QueueCommand<JVSCommand, FeatureCheckResponse>(new JVSCommand(0x14));

ProcessCommands();

Identifier = DequeueCommand<IOIdentifyResponse>().Identifier;
CommandRevision = DequeueCommand<CommandRevisionResponse>().CommandRevision;
JVSRevision = DequeueCommand<JVSRevisionResponse>().JVSRevision;
CommunicationVersion = DequeueCommand<CommunicationsVersionResponse>().CommunicationsVersion;
_features = DequeueCommand<FeatureCheckResponse>().Features;
}
public void GetInfo()
{
QueueCommand<JVSCommand, IOIdentifyResponse>(new JVSCommand(0x10));
QueueCommand<JVSCommand, CommandRevisionResponse>(new JVSCommand(0x11));
QueueCommand<JVSCommand, JVSRevisionResponse>(new JVSCommand(0x12));
QueueCommand<JVSCommand, CommunicationsVersionResponse>(new JVSCommand(0x13));
QueueCommand<JVSCommand, FeatureCheckResponse>(new JVSCommand(0x14));

ProcessCommands();

Identifier = DequeueCommand<IOIdentifyResponse>().Identifier;
CommandRevision = DequeueCommand<CommandRevisionResponse>().CommandRevision;
JVSRevision = DequeueCommand<JVSRevisionResponse>().JVSRevision;
CommunicationVersion = DequeueCommand<CommunicationsVersionResponse>().CommunicationsVersion;
_features = DequeueCommand<FeatureCheckResponse>().Features;
}
I wanted to avoid a library if possible it seems a bit heavy handed for what will be a rather small library It could do with a bit of cleaning/edge case handling but this works Lastly the actual commandgroup function, theres a seperate QueueCommand without a TRep for commands which do not provide a response (Its also what the while loop is for in DequeueCommand to skip over reading commands that do not get responded to)
public class JVSCommandGroup
{
internal JVSCommand Command { get; set; }
internal JVSReport? Report { get; set; }
}
public class JVSCommandGroup
{
internal JVSCommand Command { get; set; }
internal JVSReport? Report { get; set; }
}
They do have to be de-queued in the same order, will throw a cast exception if its not, again room for improvement
wasabi
wasabi•6d ago
So I would probably write this by using Pipes. Where each outbound and inbound sequence is assigned an int. Which serves as an index into an array to find a TaskCompletionSource to resume. Not really a need for a queue you handle yourself.
derole
deroleOP•6d ago
I see, I think writing this asynchronously will cause more issues than it will solve though
wasabi
wasabi•6d ago
Naw. TaskCompletionSource imo makes it easier. Don't have to figure out how to attach responses to requests.
derole
deroleOP•6d ago
The protocol is not designed for asynchronous operation with multiple IO boards, the IO boards do not identify themselves upon response, and if you send a command to 2 different IO boards they may attempt to send a response on the bus at the same time and clash, and theres no way of differenciating them From that perspective I would have to research into it Sorry I saw Task and assumed its something asynchronous 😅
wasabi
wasabi•6d ago
It would be. I'm just saying TCS makes it easier to write, in whole, than even trying to do it synchronously.
derole
deroleOP•6d ago
I can implement this asynchonously, but it would have to block any further executions of ProcessCommand until the IO board has responsed But yeah will have a look into it thanks
wasabi
wasabi•6d ago
Coupled with Pipes, which makes it easy to process input in batches, without being concerned about buffers.
derole
deroleOP•6d ago
You dont mean named pipes do you?
wasabi
wasabi•6d ago
No.
derole
deroleOP•6d ago
Have you got a docs link?
wasabi
wasabi•6d ago
System.IO.Pipelines - .NET
Learn how to efficiently use I/O pipelines in .NET and avoid problems in your code.
wasabi
wasabi•6d ago
So you're working with a serial connection. So a forward only stream of bytes. A single Read operation is unaware of how your device or you frames messages. In the case you posted, you have a comma.
derole
deroleOP•6d ago
Right got it, and I can just do all my serial port reads to fill the pipe with data?
wasabi
wasabi•6d ago
So, a single Read operation may return a half message.
derole
deroleOP•6d ago
Oh just note with the example, the commas are there to make it a bit clearer Theyre not actually in the protocol I should have clarified a bit
wasabi
wasabi•6d ago
Ahh, well, whatever you use to signify the end of a command. Read is unaware of that.
derole
deroleOP•6d ago
The report code for the next command trails after the last byte of the current
wasabi
wasabi•6d ago
So reading from the device into a byte[] or whatever you are doing may return a partial response. Which you usually deal with by writing a state machine of some fashion. Where you Read over and over again, until you process at least one full 'packet'. Then you return the packet to the caller. But, you might still have half of the next packet. So you keep that in your buffer. Wait for more data, until you again have a full packet. There's a lot of state management that goes into that. Reading into buffers. Copying into a cache buffer. Then appending buffer with next data. Etc Pipelines does that all for you.
derole
deroleOP•6d ago
Right I think I get it, the current code actually isnt using a cache buffer, it processes the data as its received using ReadByte
wasabi
wasabi•6d ago
Yeah, so a state machine. After you read a byte, you check some variables to determine what you expect that byte to be. Is it supposed to be a status code? Is it supposed to be the next packet? etc. The Pipelines maintains it's own sequence of buffers. It calls your code. You report back how much of that buffer you 'consumed'. It uses the number you report back to slice and append buffers. So, it sends you some data, you attempt to parse it, then report back the number of bytes you consumed. And it calls your code a second time, after new data comes in, but gives you the remaining bytes as well, so you can treat each call as an attempt to parse a whole packet. Which makes your code a lot simpler: bool TryParsePacket(ReadOnlySequenc<byte> data, out int consumed); If it can only hand you half of a packet, you just return false. And then it waits for more data, before calling your code a second time with the full packet.
derole
deroleOP•6d ago
Right, so a bit like a stream reader and you just tell it how many bytes you read from said stream?
wasabi
wasabi•6d ago
Sorta. The thing with Streams is you can't tell it to wait until you receive N number of bytes. A Stream.Read takes the maximum number of bytes to read. But it may give you less if it has less.
derole
deroleOP•6d ago
Right I think I get the concept of it now, so in this case, each "frame" of a packet gives an address and payload length, I assume I can use Pipelines to tell it to wait until the length amount of bytes is received and then process them Where as I get what you mean with streams, you can request so many bytes but the stream doesnt necessarily return that many bytes And will only return what has been sent through the serial port at the time of the stream read
wasabi
wasabi•6d ago
Well it's undefined really what it will give you but yes. The OS and drivers are all involved. It might have more data than it gives you in a single call. But because of how the driver buffered it in memory it may be split.
derole
deroleOP•6d ago
I see, but you just tell it how much of the data you processed, and on the next call it includes the unprocessed data plus any unprocessed data that was received inbetween?
wasabi
wasabi•6d ago
Its also less copies. What it gives you is a ReadOnlySequence. Yup Dunno if that'll work with SerialPort on .NET though. But in the ideal case, the ReadOnlySequence will be a list of Spans directly from the OS buffer. Not a copied .NET array.
derole
deroleOP•6d ago
SerialPort has an underlying stream, I dont know if that can be passed to Pipeline to work with?
wasabi
wasabi•6d ago
It can. It's been ages since I looked at SerialPort. Looking. Yeah so it has no read into Span, so that's unfortunate. But Read() to feed the PipeWriter would be better than the Stream. So for Pipelines you have two parts. A PipeReader and a PipeWriter. One side fills the PipeWriter. The other loops over the PipeReader. This is what your writer code would look like sorta: async Task FillPipeAsync(Socket socket, PipeWriter writer) { const int minimumBufferSize = 512; while (true) { // Allocate at least 512 bytes from the PipeWriter. Memory<byte> memory = writer.GetMemory(minimumBufferSize); try { int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); if (bytesRead == 0) { break; } // Tell the PipeWriter how much was read from the Socket. writer.Advance(bytesRead); } catch (Exception ex) { LogError(ex); break; } // Make the data available to the PipeReader. FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) { break; } } // By completing PipeWriter, tell the PipeReader that there's no more data coming. await writer.CompleteAsync(); } That just loops forever, until in that sample's case, the socket is closed. Which is zero bytes read. But notice how it asks the Writer for Memory. The Pipeline maintains a buffer pool for you. So you are asking it for memory to write into. And it gives you a buffer. Which you then report back. And that buffer you get might just be a sliced array that's reused. Then on the reading side: async Task ReadPipeAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) { // Process the line. ProcessLine(line); } // Tell the PipeReader how much of the buffer has been consumed. reader.AdvanceTo(buffer.Start, buffer.End); // Stop reading if there's no more data coming. if (result.IsCompleted) { break; } } // Mark the PipeReader as complete. await reader.CompleteAsync(); } TryReadLine is your... I dunno, TryReadCommand or something.
derole
deroleOP•6d ago
Right, so TryReadLine would be where an entire command read is attempted
wasabi
wasabi•6d ago
Yup.
derole
deroleOP•6d ago
And when it has everything for one command, it goes to ProcessLine etc etc until end of packet
wasabi
wasabi•6d ago
Yeah. So three bits of code. BIt one is just to pump data as it comes into the writer. Bit two just attempts to parse commands until the end. And then your TryParseCommand, which can be very simple: it can assume it's at the start of a command. And you start it up like this: async Task ProcessLinesAsync(Socket socket) { var pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(pipe.Reader); await Task.WhenAll(reading, writing); }
derole
deroleOP•6d ago
Right, I get that from a response POV now, just need to figure out how TaskCompletionSource links into this
wasabi
wasabi•6d ago
TaskCompletionSource is what allows you to create a Task, and finish it when something happens.
derole
deroleOP•6d ago
Since the board does not return any command codes, it expects the client to know what commands it issued in what order
wasabi
wasabi•6d ago
Say you have Task SendCommand(). The point of this code would be to queue up some data to be sent to the device, and return a Task to the user which is completed when the future response arrives. Since you need to basically 1:1 map commands to responses, you'll need to maintain that yourself. As commands come in, you give them an ID. As responses come in, you incremenet a counter. The counter represents the ID of the original command. So now you know which task to complete.
derole
deroleOP•6d ago
OH right So between them just a simple ID of what command links to what
wasabi
wasabi•6d ago
var tcs = new TaskCompletionSource<TCommandResponse>(); // queue command return tcs.Task;
derole
deroleOP•6d ago
And then response Pipeline can reference that to figure out what its decoding
wasabi
wasabi•6d ago
That's what your SendCommand method looks like basically. where "queue command" is inserting the TCS into an int-indexed list of some kind. In your ProcessResponse method, which is run by the Reading loop, you can look up the associated original TCS and complete it. var tcs = outstanding[theid]; tcs.SetComplete(parsedResponse); That completes the original task that you had handed out. (you'll have to manage that list yourself, removing finished items, etc) I'd frankly just start it with a Dictionary<int, TaskCompletionSource>(); probaby. But I'm sure there's a better structure there. :) Well, probably a ConcurrentDictionary<int, TaskCompletionSource<IResponse>>()
derole
deroleOP•6d ago
Right I see, and I can just refer to that dictionary in order to grab the response class I need to deserialise into
wasabi
wasabi•6d ago
Becauase the Send will likely happen on a thread different than the Reader thread. And after all that, you'll be able to send commands like: var response = await devicething.SendCommand(new Command())
derole
deroleOP•6d ago
Right I see, ima need a little time to read back through this and try make an implementation, this has been really helpful though thanks :) This is all stuff I didnt realise existed
wasabi
wasabi•6d ago
np
Mayor McCheese
Mayor McCheese•6d ago
No docs is terrible With anything serial
this_is_pain
this_is_pain•5d ago
"docs" in this case is an excel file, just saying
Mayor McCheese
Mayor McCheese•5d ago
At one job we were working on a ticketing system and needed to interact with Ticketmaster over a serial interface. Even though we needed to interface with them, and the project would fail on both sides if we didn't, we'd still have to buy their docs for some unreasonable amount. They said we were free to reverse engineer them. That was fun.

Did you find this page helpful?