Developing an advanced Jellyfish use case

Hey I've been using jellyfish to develop a platform for essentially one-on-one calls between two people and it works really well. I'd like to now bring in something more advanced. I essentially want to: 1. take the two audio streams of the two peers from jellyfish and convert everything their saying into text using something like bumblebee whisper. 2. After a certain amount of audio has been translated into words (or maybe just X amount of time), feed the text into a gpt model and get some new text response 3. take the gpt text response and convert it back into audio using text-to-speech (using bumblebee again?) 4. feed the text-to-speech audio back directly into the jellyfish room for both peers to hear it. 5. all this would essentially need to be happening while the room is still ongoing and the entire process ideally not taking too long. I am willing to get into the weeds of membrane and jellyfish to make it work but I am new to this entire multimedia space and not sure where to start. Im hoping I can use the infrastructure around jellyfish as much as possible. So far I know that I'll need a third JF component to play the gpt audio to both peers, but not sure which one is suitable or how to get the audio to the component. I am also unsure how I can get access to the webRTC peer audio streams where I would potentially need to feed them into a custom membrane pipeline? I would appreciate any ideas you guys. Thank you for your time
31 Replies
Radosław
Radosław11mo ago
Hi, to add this functionality to Jellyfish you will have to pretty much add a new component to jellyfish. Under the hood each component maps to Membrane.RTC.Engine endpoints, so you will have to create one too. Here we have a documentation for creating a custom endpoint in rtc_engine: https://hexdocs.pm/membrane_rtc_engine/custom_endpoints.html Each rtc_engine endpoint is Membrane.Bin where you will create a part of the pipeline. The simplified description how you pipeline would looks like: multiple_audio_inputs -> audio_mixer -> speech_to_text -> gpt -> text_to_speech -> bin_output_pad During implementing all of that you can look for inspiration/reference to different endpoints which both subscribe on tracks and publish tracks. At the moment these endpoints are: WebRTC and SIP. Similarly with creating your custom component you should reference other components e.g: SIP. For doing a speech_to_text and text_to_speech with use of bumblebee whisper you will have to probably create your custom Membrane.Filters.
Jdyn
JdynOP11mo ago
This is really helpful. After going through your link and doing some research my current idea is to create two separate membrane_rtc_engine endpoints: 1. An endpoint that subscribes to all audio tracks in a JF room and converts all audio into a timelined transcript 2. An endpoint that takes a textual input , converts it to voice audio and voices it into a jellyfish room. I would then porbably need to create two different jellyfish components to interface with those engine endpoints. Am I on a good track? the reason for splitting the components in two is so that I could potentially save the call transcript separately 🤔 Then I can just do the gpt work in between feeding the transcript to that 2nd voice endpoint I suppose it could be just one JF component though
Radosław
Radosław11mo ago
Both approach sounds valid and in the end this is a implementation detail. I think you can start with an endpoint that will create a timelined transcript and then decide if it is easier to extend this endpoint or create a seperate endpoint. The benefit of one endpoint I see is that you encapsulate this whole functionality in one process and you won't have to deal with problems regarding synchronization e.g: (how the second endpoint (response_to_speech) will know that the response is created (polling doesn't sounds as a optimal idea), what happen when first endpoint for some reason crash should the second also etc.) Also if you create two endpoints you will have to create another element which will be Membrane.Source which will create a Membrane.Buffer from responses and maybe transform it a little bit.
Jdyn
JdynOP11mo ago
Really good points. I'll definitely have to consider these. So you're saying that along with the two rtc endpoints, I would need a third piece, a Membrane.Source that creates a Membrane.Buffer. What would I be giving as input to this Source, The raw text? And what would it output? I apologize I am still getting familiar with the terminology
Radosław
Radosław11mo ago
You will use Membrane.Source in your second endpoint (this which transforms textual input to a voice audio), and the output is pretty much depends how do you imagine that 😅 . An example of Membrane.Source is Membrane.File.Source (https://github.com/membraneframework/membrane_file_plugin/blob/master/lib/membrane_file/source.ex). And you will need this in that case because you need a source of the stream to pass to rtc_engine and in most cases the source of the stream is some element which use Membrane.Source. (There is Membrane.Endpoint which is both a Sink and Source, but you shouldn't think about it currently). To visualize it when you have two endpoints, first will look like this: input_pads_from_rtc_engine -> audio_mixer -> speech_to_text -> gpt_sink (GPT sink because the the end of the branch of the pipeline is in most cases some element which implements Membrane.Sink) The second one will look like this: gpt_source -> text_to_speech -> bin_output_pad This visualization is a simplified view and lacks some element e.g TrackSender, TrackReceiver etc. But it's goal is to give you a little bit better understanding. In case of one endpoint it would look like I mention before: multiple_audio_inputs -> audio_mixer -> speech_to_text -> gpt -> text_to_speech -> bin_output_pad This maybe will have to be seperated a little bit (because of a latency (you will wait to long for response from gpt)): multiple_audio_inputs -> audio_mixer -> speech_to_text -> gpt_endpoint gpt_endpoint -> text_to_speech -> bin_output_pad But this still would be in the same endpoint.
Jdyn
JdynOP11mo ago
Ok your info is exactly what I was hoping for. I think I have a good enough idea to begin working on the implementation. Going to try not to bother you too much When you refer to the audio_mixer, is the idea to merge all of the audio tracks from the JF room into one stream to pass into whisper? Im not sure if this would prevent me from knowing who said what 🤔 Would this LiveAudioMixer work here? https://hexdocs.pm/membrane_audio_mix_plugin/Membrane.LiveAudioMixer.LiveQueue.html#summary
Radosław
Radosław11mo ago
Yup I thought that this would be a good idea, but maybe it won't be in your use case. The main pros of this approach is that you have one instance of the ML model, so you won't have to handle the fighting for the resources of this models. Also it is simple in implementation as you have less elements. Yup the linked element should work, we use it in SIP Endpoint and HLS Endpoint and it work in realtime
Jdyn
JdynOP11mo ago
I just noticed this got merged yesterday which seems like it would allow me to stream multiple inputs into whisper at once. That is, all of the audio streams at once using a batched run 🤔 https://github.com/elixir-nx/bumblebee/issues/261
GitHub
Stream audio chunk by chunk to Whisper · Issue #261 · elixir-nx/bum...
Hey, it's already possible to make the serving provide data to the model in chunks, however, it seems that the whole audio still has to be available at once, which is impossible for live stream...
Radosław
Radosław11mo ago
You can try to use it. Poorly, I am not up to date with current state of Bumblebee 😅.
Jdyn
JdynOP11mo ago
Yeah I've now switched to keeping it as simple as possible for now but still trying to wrap my head around membrane, it is very complex 😅 my current goal is to keep the streams separate, run each track into the bumble bee translator, then basically have the response returned to the rtc endpoint bin, and then just print the translations using the debug sink. Once I create the spec in handle_pad_added, I cannot seem to be able to pass the tracks into a filter where I am trying to start the translation process. I am getting this error that I can't seemt o find documentation for. A timeout occurs whenever I add my custom filter to the pad_added spec
21:03:40.088 [warning] Process 66-842c-461cd7dd is down with reason: {:timeout, {GenServer, :call, [#PID<0.1505.0>, {Membrane.Core.Message, :start_component, [{:endpoint, "1234_scribe"}, Membrane.Core.Bin, %{group: "1234_scribe", log_metadata: [rtc_engine: "66-842c-461cd7dd"]
21:03:40.088 [warning] Process 66-842c-461cd7dd is down with reason: {:timeout, {GenServer, :call, [#PID<0.1505.0>, {Membrane.Core.Message, :start_component, [{:endpoint, "1234_scribe"}, Membrane.Core.Bin, %{group: "1234_scribe", log_metadata: [rtc_engine: "66-842c-461cd7dd"]
I've uploaded my progress here but it is a nightmare https://github.com/Jdyn/membrane_scribe/tree/main/lib Here, I think there is something very wrong with the setup https://github.com/Jdyn/membrane_scribe/blob/main/lib/scribe_endpoint.ex#L61
Radosław
Radosław11mo ago
It looks to me like some callback is taking too much time for some reason. You can try to find out operation which took very long time and if they happens in handle_init you can try to move them to handle_setup.
Radosław
Radosław11mo ago
GitHub
membrane_scribe/lib/scribe/filter.ex at 380a114ad14832c658448ae5a5c...
Contribute to Jdyn/membrane_scribe development by creating an account on GitHub.
Jdyn
JdynOP11mo ago
Thanks that worked. I've gotten to the point where handle_buffer is getting called in the filter, but this pts and dts are nil which is preventing the buffer from getting added to LiveQueue
buffer #=> %Membrane.Buffer{
payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, ...>>,
pts: nil,
dts: nil,
metadata: %{}
}
buffer #=> %Membrane.Buffer{
payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, ...>>,
pts: nil,
dts: nil,
metadata: %{}
}
https://github.com/membraneframework/membrane_audio_mix_plugin/blob/v0.16.0/lib/membrane_live_audio_mixer/live_queue.ex#L123 How come the pts field is nil in my case / how do I populate it?
GitHub
membrane_audio_mix_plugin/lib/membrane_live_audio_mixer/live_queue....
Plugin providing an element mixing raw audio frames. - membraneframework/membrane_audio_mix_plugin
Jdyn
JdynOP11mo ago
LiveQueue does this operation pts = pts + queue.offset
Radosław
Radosław11mo ago
To get timestamps you can try to add raw_audio_parser, before scribe_endpoint. It should add timestamps. https://github.com/membraneframework/membrane_raw_audio_parser_plugin/tree/master
GitHub
GitHub - membraneframework/membrane_raw_audio_parser_plugin: Membra...
Membrane element for parsing raw audio. Contribute to membraneframework/membrane_raw_audio_parser_plugin development by creating an account on GitHub.
Jdyn
JdynOP11mo ago
Thanks it worked 👍 So if I have the two peer audio tracks from the JF room as input pads to a filter with manual, whenever I "demand" more data from the pads, am I going to get data from the point that I last demanded or am I going to "miss" some audio, if enough comes in before I next "demand" it? On a side note, is there any easy way to create tests for the whole thing with two peers producing an audio stream? I was looking at the engine integration test but not sure the best way to work with it
Radosław
Radosław11mo ago
Yup, you will get data from the point that you last demanded, but remember that you have to demand at the proper speed. If you fall behind too much you will receive an error named ToiletOverflow. It simply informs that buffers stored and waiting for your demand exceeded the limit.
Radosław
Radosław11mo ago
I think the easiest way for that would be to use FileEndpoint which would imitate two peers. Example tests for file endpoint https://github.com/jellyfish-dev/membrane_rtc_engine/blob/e1d5c4d09f8dd1755017283d104ec84167ed2ca1/file/test/file_endpoint_test.exs#L54
GitHub
membrane_rtc_engine/file/test/file_endpoint_test.exs at e1d5c4d09f8...
Customizable Real-time Communication Engine/SFU library focused on WebRTC. - jellyfish-dev/membrane_rtc_engine
Jdyn
JdynOP10mo ago
Thanks for all your help. Probably going to be working on this for a while 😅 Hey im looking into setting up a test environment using the file endpoint as sources but the test crashes with
17:29:13.797 [error] GenServer #PID<0.458.0> terminating
** (Membrane.PadError) Tried to unlink a static pad :input. Static pads cannot be unlinked unless element is terminating
(membrane_core 1.0.1) lib/membrane/core/element/pad_controller.ex:247: Membrane.Core.Element.PadController.handle_unlink/2
(membrane_core 1.0.1) lib/membrane/core/element.ex:254: Membrane.Core.Element.handle_info/2
(stdlib 4.3.1.3) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.3.1.3) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.3.1.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {Membrane.Core.Message, :handle_unlink, :input, []}
State: %Membrane.Core.Element.State{module: Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver, name: {:track_receiver, "6CA58BDCB71BE517"}, ...
17:29:13.797 [error] GenServer #PID<0.458.0> terminating
** (Membrane.PadError) Tried to unlink a static pad :input. Static pads cannot be unlinked unless element is terminating
(membrane_core 1.0.1) lib/membrane/core/element/pad_controller.ex:247: Membrane.Core.Element.PadController.handle_unlink/2
(membrane_core 1.0.1) lib/membrane/core/element.ex:254: Membrane.Core.Element.handle_info/2
(stdlib 4.3.1.3) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.3.1.3) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.3.1.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {Membrane.Core.Message, :handle_unlink, :input, []}
State: %Membrane.Core.Element.State{module: Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver, name: {:track_receiver, "6CA58BDCB71BE517"}, ...
I am unsure what the error means or how or why the error occurs because I am not trying to unlink anything. I'd just like to simulate adding my endpoint that is listening on track added events, and then adding file endpoints with audio so that the custom endpoint sees the tracks and starts processing them. The test file is here https://github.com/Jdyn/membrane_scribe/blob/main/test/scribe_endpoint_test.exs My thought is that the audio file is finishing and then it removes itself from the engine, but my endpoint doesn't know how to handle tracks being removed? I guess in general I am not sure how to gracefully handle what to do when a track is removed
Radosław
Radosław10mo ago
Pretty much you should implement callback handle_pad_removed. Like for example in sip_endpoint https://github.com/jellyfish-dev/membrane_rtc_engine/blob/63848ed2b271fa59c1f42a796997f5cfc7958502/sip/lib/sip_endpoint.ex#L277-L290. In this callback you try to remove all membrane elements that are linked to removed pad, because Membrane doesn't support dandling elements.
GitHub
membrane_rtc_engine/sip/lib/sip_endpoint.ex at 63848ed2b271fa59c1f4...
Customizable Real-time Communication Engine/SFU library focused on WebRTC. - jellyfish-dev/membrane_rtc_engine
Jdyn
JdynOP10mo ago
Damn it worked thank you
Radosław
Radosław10mo ago
No problem
Jdyn
JdynOP10mo ago
Currently im using the LiveAudioMixer as you suggested and it's working well. But I would now instead like to take all of the audio tracks in the room and create a single multi-channel output, with each track on a different channel. So if there are two peers in the room, I'd like to combine their audio into a single output with 2 channels. Is there a plugin that can do this? I am not entirely familiar with audio channels
Radosław
Radosław10mo ago
Yyyyy I don't know if there is a plugin for that but maybe @Łukasz Kita will know something more or suggest some solution for your problem.
varsill
varsill10mo ago
Hi @Jdyn ! As far as I know, the Membrane.AudioInterleaver element available in the membrane_audio_mix_plugin (https://github.com/membraneframework/membrane_audio_mix_plugin/) should be capable of doing so - the only problem is, that it expects a fixed number of output channels to be provided on startup of the element (if I get it correctly, in you scenario, the number of channels might change in time, with users joining and leaving the room). BTW - what would you like to achieve with each user's audio being put in a separate channel? I am slightly afraid that even if we were to solve the problem with number of channels changing in time in the Membrane.AudioInterleaver, we could encounter a similar problem while trying to encode audio.
Jdyn
JdynOP10mo ago
Thanks a lot for your responses. So the high level problem is that I need to transcribe what each person is saying individually without losing who is saying what. There is a service that can transcribe each channel in the audio individually. One thing is that I do know the "max" amount of output channels that would ever be added for my case, but pads should be able to be removed and added if someone disconnects / leaves 🤔 . so I wonder if the interleaver can be modified to allow for inputs to be added and removed freely, but still require to specify the max output channels? My thought is that if we know the max channels upfront, we can fill channels that are not connected to an input with silence? Not sure
varsill
varsill10mo ago
Ok, I see the picture now 😉 Indeed, I believe that setting a fixed number of max channels and filling with silence ones that are not yet "occupied" seams to be the reasonable solution - and what further processing needs to be performed on that audio data? Do you need to encode it before sending to the transcription service?
Jdyn
JdynOP10mo ago
In my case I am able to send s16le to the service. So what I do is process the jellyfish tracks down into s16le then ideally send it through the interleaver, after that no further processing is needed in my case, I just send it What problems might occur if I did have further processing? Maybe later down the line I'd like to do something further to the audio 🤔 Is it more difficult / performance intensive to process such multi channel audio?
varsill
varsill10mo ago
Well, it should work fine with your service then 😉 However I believe it's not that common to use channels for such a purpose - normally the channels are used to describe, for instance, the sound in the background. That's why I think that it might be difficult to "reuse" that audio stream that you send to the service without "deinterleaving" it first - later you could work on multiple audio streams, one per each user.
Jdyn
JdynOP10mo ago
So is the way you create multi-channel audio by basically specifying the amount of channels, and then in the binary, you place one frame of audio from each track one after another for that time frame and then repeat? Then its up to the receiver to basically splice out each channels "frame" to reconstruct the original tracks?
varsill
varsill10mo ago
Hello, well, it depends on what your service expects to receive. However, in most cases, raw (uncompressed) audio is represented in the following PCM format: 1. First, you need to specify some fixed sampling rate (for instance 44100 Hz) and then in your binary audio representation, there are 44100 samples per second. 2. Each sample then represents a value of audio measured at this given point in time, written in some settled format (for instance: s16le, meaning that it will be a signed integer written on 16 bits, with little-endian bytes order), for a given number of channels (the audio's value representation for each channel in a particular sample is put one after the other).

Did you find this page helpful?