Creating a Phoenix channel source

Hi! I need to process an audio stream (webm/opus) coming in through a channel (websocket) in phoenix and I'd like to use membrane to do it. Is the way to do it to write my own membrane source that is essentially a "beam message source" - ie. reads messages sent to it from another process and then passes that along to the next stage in the pipeline? And then have the channel process send the message to the "beam message source" element And then similarly if I want to send messages back to the channel at the end of the pipeline, would I be making a "beam message sink" I'm pretty sure I can make that work but not sure if there's a better way in membrane
4 Replies
Kuba Rozruba
Kuba Rozruba•2w ago
Hi! I think the best way for you would be to create the message sink and source yourself, it shouldn't be a lot of work. We were tinkering around with the idea, but we never concluded what such elements should exactly do and how should they work and be interacted with in a generic way, since the use-cases can vary a lot. If you have any questions feel free to ask :)
rohan
rohanOP•2w ago
The thing I'm unsure of is whether it should be pull-based or push-based (or auto). Do you have any heuristics on how to make that call?
varsill
varsill•7d ago
Hi! It depends if you have any control over the pace at which the audio stream data is coming through the channel. In case you don't I would start with the simplest scenario, meaning that I would build a Source with :output pad in :push flow control mode. In handle_info/3(https://hexdocs.pm/membrane_core/1.2.2/Membrane.Element.Base.html#c:handle_info/3) implementation I would return:

{[buffer: %Membrane.Buffer{payload: <your payload from the message>, pts: <timestamp>}], state}

{[buffer: %Membrane.Buffer{payload: <your payload from the message>, pts: <timestamp>}], state}
and that's it. Ofcourse, if audio data is sent faster than the rest of the pipeline can cope with, it might cause trouble, which then could be mitigated with either some Membrane Core buffering capabilities or by implementing source element with :manual pad and implementing custom buffering inside the element's logic. If you can send some "demand" messages through your socket and have guarantee, that audio data chunk won't be sent until it's explicitly requested, then implementing a :manual pad is a way to go for you. Best wishes! 😉
rohan
rohanOP•6d ago
Awesome thank you!

Did you find this page helpful?