Writing a `Bin` queuing content from multiple remote files
@skillet wrote in https://discord.com/channels/464786597288738816/1007192081107791902/1224491418626560121
Hello all. New to the framework (and elixir) and still a little fuzzy on how to implement my idea. Basically I want to stitch together a bunch of wav and/or mp3 files and stream them indefinitely. Like a queue where I can keep adding files and the pipeline should grab them as needed FIFO style.
The files will be downloaded via HTTP. So what I'm currently envisioning is a Bin
that uses a Hackney source element to grab the file and push it on down. Then, when it's done it will get replaced with a new Hackney source pointing to the next file.
Does that sound like the right idea, and are there any good examples you can point me to that do something similar?
34 Replies
Your problem can be solved in the following way:
Write a
-
Bin
, that holds in its own state queue of urls to the files.
Bin
should have one output pad with availability: :always
.
Bin
should spawn following children:-
child(:funnel, %Funnel{end_of_stream: :never}) |> bin_output()
- this should be spawned in handle_init
(https://hexdocs.pm/membrane_funnel_plugin)
- child({:hackeny, url}, %Hackney.Source{location: url, hackney_opts: [follow_redicrect: true]} |> get_child(:funnel)
- the first hackeny source should be spawned, when the first url
will be available. handle_element_end_of_stream
from {:hackney, url}
in Bin
should terminate Hackney.Source
that has sent end of stream and spawn new one and link it to the funnel
.
Keep in mind, that if you want to put a decoder after a Bin
, it might have problems with receiving many mp3
files one after another from a single pad.@Feliks thank you, this is perfect!
And we spawn the new children by returning a new
spec
from the callbacks?Yes. Unfortunatelly, example in
membrane_hackney_plugin
is little outdated, so don't rely on it. You should also use :remove_children
action
These links might be helpful:
- https://hexdocs.pm/membrane_core/Membrane.Bin.Action.html#t:remove_children/0
- https://membrane.stream/learn/get_started_with_membrane
- https://membrane.stream/learnMembrane framework
Reliable & scalable multimedia streaming framework for Elixir
Learn - Membrane framework
Reliable & scalable multimedia streaming framework for Elixir
@Feliks follow up question: what do you think would be the best protocol on the sink end to stream the result to the browser in a Phoenix LiveView app? Preferably to be played in a standard HTML audio tag
If audio, that you want to play will be static and pre-defined, you can just put url to a static file in an audio tag.
If it won't be static, you can use HLS to send it to the browser, but as far as I know, it cannot be done with just an audio tag on the browser side, but there are some JS libraries that can handle receiving HLS from a server. (https://github.com/video-dev/hls.js)
The third option is WebRTC, but it is designed to handle live use-cases and it might make server deployment harder
GitHub
GitHub - video-dev/hls.js: HLS.js is a JavaScript library that play...
HLS.js is a JavaScript library that plays HLS in browsers with support for MSE. - video-dev/hls.js
Cool. I was trying to avoid bringing in a JS dependency but it looks like HLS might indeed be the best option
Hey @Feliks , do you see anything wrong with this spec? The pipeline is shutting down before hitting the "After spec" line:
Getting these logs:
And
handle_init
in the source bin is not executing
I'm starting the pipeline under a supervision tree inside a handle_event
in a live view if that's relevant
Turns out there was a problem with options.input_schema
as options
is a keyword list, not a map. I never saw that log until I ran it from the app's root supervisorOk, so is the problem solved?
That one is! Now I have a new one 😅
I'm getting an error about returning
setup: :complete
from handle_setup
. It says the setup is already complete. Haven't had a chance to investigate yet but I'll keep you postedReturning
setup: :complete
is reasonable only if you returned setup: :incomplete
ealier in handle_setup
, eg. you return setup: :incomplete
from handle_setup
and 1 second later setup: :complete
from handle_info
.
Setup is completed by default after handle_setup
, you have to return setup: :incomplete
to change this behaviour
I guess that there is no need to return setup: :complete
in your caseAh! That explains it. Thank you
Ever seen this one before @Feliks ?
And this is the spec:
This looks like
Membrane.WAV.Parser
receives buffer with payload, that contains field with value, that for some reason (idk why) is not supported by our parser
Where did you take you .wav
data from?It's generated by the MusicGen AI model running on replicate.com
the API outputs a URL like
https://replicate.delivery/pbxt/xe13ALmFxxQtX6qbQ2JzIQl4cF8835PeYKHexJaN5gIIQiQlA/out.wav
which I pass into the Hackney sourceYour
.wav
contains some field specifying chunk type set to "LIST"
, while our parser supports "data"
and "fact"
values.Interesting
I can also get them delivered as mp3, but I didn't see any parsers for that, only decoders expecting an mp3 file rather than a bytestream
But I'm probably overlooking something
MP3 decoder should be able to handle bytestream, that is not chunked in any specific way
Ok, I'll give that a shot
What do you want to do with this
mp3
then?Send it through the funnel to be added to an ongoing HLS stream
basically the AI will be continually generating these files in the background and I will add them to the stream as they are delivered
So decode mp3 end encode raw audio to
AAC
and put it into HLSright
@Feliks do I need to explicitly tell the pipeline to wait for its child bin to finish setup before playing?
handle_playing
will be always called after finishing the setup, but various elements may finish setup and execute handle_playing
independently.
Every element/bin (let's name it :child
) executes handle_playing
as soon as:
1. It's own setup is completed
2. Setups of all other elements/bins spawned in the same spec as :child
are completed
3. Parent of :child
is already playing
For pipelines it is simpler: every pipeline executes handle_playing
just after finishing setup.
Setup is finished:
1. Just after handle_setup
, if action {:setup, :incomplete}
wasn't returned from it.
2. Otherwise, just after returning action {:setup, :complete}
, if handle_setup
had returned {:setup, :incomplete}
earlier
If you shared your code, maybe I would be able to say what is the reason of your problemSure thing!
I've also tried with all the conversion elements inside the source bin
And here is the error I'm currently getting:
I'm sure the hackney element's stream is ending much faster than I can generate new files so I'm not 100% sure how to handle that. I need it to gracefully wait for more files to be available
Yeah I guess I need to check if the queue is empty in the end of stream handler and put it in some kind of waiting mode
Then in the
:get_next_track
handler I can check if it is waiting and kick it off again
But even with this obvious problem, I'm still not sure why I'm getting that error. Seems that the AAC encoder is getting some data without a pts
But now that I think about it, I'm not sure the hackney source's end of stream handler is even being called. But the funnel's is
Here is a reworked end of stream handler:
Sorry for delay
The error from aac fdk plugin occurs in the changes that where introduced recently, so try downgrading version of this plugin to
{:membrance_aac_fdk_plugin, "0.18.2"}
Thanks! Just spent a couple of hours on this, because I thought I were doing something wrong, and somehow had messed up the pts in my pipeline.
I opened an issue here https://github.com/membraneframework/membrane_core/issues/791 to track it.
@skillet do you have an example stream you can share, to help reproduce the problem? I'm not able to share the one I have.
GitHub
AAC FDK plugin pts bad arg in arithmetic · Issue #791 · membranefra...
When encoding with certain streams (sorry, can't share the one example I have), the pts check fails with: [error] GenServer #PID<0.830.0> terminating ** (ArithmeticError) bad argument in ...
Thanks guys. @Aske I will put something together for you to reproduce it
@Aske @skillet does downgrading
membrane_aac_fdk_plugin
to 0.18.2
or lower solve your problem and allow you to go further?Yes it worked for me!
Yes, downgrading worked for me! Thanks a lot for suggesting that! ❤️
@Aske @skillet We have just released version
0.18.8
of membrane_aac_fdk_plugin
fixing the bug that you have met, so you should be able to update this pluing to the newest version and use it without such an error. Let me know, if the new version will cause any other problemThank you @Feliks ! I'll try it out today