Writing a `Bin` queuing content from multiple remote files

@skillet wrote in https://discord.com/channels/464786597288738816/1007192081107791902/1224491418626560121
Hello all. New to the framework (and elixir) and still a little fuzzy on how to implement my idea. Basically I want to stitch together a bunch of wav and/or mp3 files and stream them indefinitely. Like a queue where I can keep adding files and the pipeline should grab them as needed FIFO style. The files will be downloaded via HTTP. So what I'm currently envisioning is a Bin that uses a Hackney source element to grab the file and push it on down. Then, when it's done it will get replaced with a new Hackney source pointing to the next file. Does that sound like the right idea, and are there any good examples you can point me to that do something similar?
34 Replies
Feliks
FeliksOP10mo ago
Your problem can be solved in the following way: Write a Bin, that holds in its own state queue of urls to the files. Bin should have one output pad with availability: :always. Bin should spawn following children:
- child(:funnel, %Funnel{end_of_stream: :never}) |> bin_output() - this should be spawned in handle_init (https://hexdocs.pm/membrane_funnel_plugin) - child({:hackeny, url}, %Hackney.Source{location: url, hackney_opts: [follow_redicrect: true]} |> get_child(:funnel) - the first hackeny source should be spawned, when the first url will be available. handle_element_end_of_stream from {:hackney, url} in Bin should terminate Hackney.Source that has sent end of stream and spawn new one and link it to the funnel. Keep in mind, that if you want to put a decoder after a Bin, it might have problems with receiving many mp3 files one after another from a single pad.
skillet
skillet10mo ago
@Feliks thank you, this is perfect! And we spawn the new children by returning a new spec from the callbacks?
Feliks
FeliksOP10mo ago
Yes. Unfortunatelly, example in membrane_hackney_plugin is little outdated, so don't rely on it. You should also use :remove_children action These links might be helpful: - https://hexdocs.pm/membrane_core/Membrane.Bin.Action.html#t:remove_children/0 - https://membrane.stream/learn/get_started_with_membrane - https://membrane.stream/learn
Membrane framework
Reliable & scalable multimedia streaming framework for Elixir
Learn - Membrane framework
Reliable & scalable multimedia streaming framework for Elixir
skillet
skillet10mo ago
@Feliks follow up question: what do you think would be the best protocol on the sink end to stream the result to the browser in a Phoenix LiveView app? Preferably to be played in a standard HTML audio tag
Feliks
FeliksOP10mo ago
If audio, that you want to play will be static and pre-defined, you can just put url to a static file in an audio tag. If it won't be static, you can use HLS to send it to the browser, but as far as I know, it cannot be done with just an audio tag on the browser side, but there are some JS libraries that can handle receiving HLS from a server. (https://github.com/video-dev/hls.js) The third option is WebRTC, but it is designed to handle live use-cases and it might make server deployment harder
GitHub
GitHub - video-dev/hls.js: HLS.js is a JavaScript library that play...
HLS.js is a JavaScript library that plays HLS in browsers with support for MSE. - video-dev/hls.js
skillet
skillet10mo ago
Cool. I was trying to avoid bringing in a JS dependency but it looks like HLS might indeed be the best option Hey @Feliks , do you see anything wrong with this spec? The pipeline is shutting down before hitting the "After spec" line:
@impl true
def handle_init(_ctx, options) do
IO.inspect(options, label: "Options: ")
sink_bin = %HTTPAdaptiveStream.SinkBin{
manifest_module: HTTPAdaptiveStream.HLS,
storage: %FileStorage{directory: "priv/static/audio"},
mode: :live,
persist?: true
}

IO.inspect("Before spec")

spec =
child(:source_bin, %ReplicateSourceBin{input_schema: options.input_schema})
|> child(:sink_bin, sink_bin)

IO.inspect("After spec")

{[spec: spec], %{}}
end
@impl true
def handle_init(_ctx, options) do
IO.inspect(options, label: "Options: ")
sink_bin = %HTTPAdaptiveStream.SinkBin{
manifest_module: HTTPAdaptiveStream.HLS,
storage: %FileStorage{directory: "priv/static/audio"},
mode: :live,
persist?: true
}

IO.inspect("Before spec")

spec =
child(:source_bin, %ReplicateSourceBin{input_schema: options.input_schema})
|> child(:sink_bin, sink_bin)

IO.inspect("After spec")

{[spec: spec], %{}}
end
Getting these logs:
"Before spec"
[debug] ReplicatePipeline/ subprocess supervisor got exit request from parent, reason: :shutdown, shutting down children
"Before spec"
[debug] ReplicatePipeline/ subprocess supervisor got exit request from parent, reason: :shutdown, shutting down children
And handle_init in the source bin is not executing I'm starting the pipeline under a supervision tree inside a handle_event in a live view if that's relevant Turns out there was a problem with options.input_schema as options is a keyword list, not a map. I never saw that log until I ran it from the app's root supervisor
Feliks
FeliksOP10mo ago
Ok, so is the problem solved?
skillet
skillet10mo ago
That one is! Now I have a new one 😅 I'm getting an error about returning setup: :complete from handle_setup. It says the setup is already complete. Haven't had a chance to investigate yet but I'll keep you posted
Feliks
FeliksOP10mo ago
Returning setup: :complete is reasonable only if you returned setup: :incomplete ealier in handle_setup, eg. you return setup: :incomplete from handle_setup and 1 second later setup: :complete from handle_info. Setup is completed by default after handle_setup, you have to return setup: :incomplete to change this behaviour I guess that there is no need to return setup: :complete in your case
skillet
skillet10mo ago
Ah! That explains it. Thank you Ever seen this one before @Feliks ?
[error] GenServer #PID<0.916.0> terminating
** (CaseClauseError) no case clause matching: "LIST"
(membrane_wav_plugin 0.10.1) lib/membrane_wav/parser.ex:162: Membrane.WAV.Parser.parse_payload/3
[error] GenServer #PID<0.916.0> terminating
** (CaseClauseError) no case clause matching: "LIST"
(membrane_wav_plugin 0.10.1) lib/membrane_wav/parser.ex:162: Membrane.WAV.Parser.parse_payload/3
Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<82, 73, 70, 70, 70, 196, 9, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 2, 0, 0, 125, 0, 0, 0, 244, 1, 0, 4, 0, 16, 0, 76, 73, 83, 84, 26, 0, 0, 0, 73, ...>>, pts: nil, dts: nil, metadata: %{}}], [for_pad: :input]}
Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<82, 73, 70, 70, 70, 196, 9, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 2, 0, 0, 125, 0, 0, 0, 244, 1, 0, 4, 0, 16, 0, 76, 73, 83, 84, 26, 0, 0, 0, 73, ...>>, pts: nil, dts: nil, metadata: %{}}], [for_pad: :input]}
And this is the spec:
spec =
child(:hackney, %Hackney.Source{location: output, hackney_opts: [follow_redirect: true]})
|> child(:parser, Membrane.WAV.Parser)
|> get_child(:funnel)
spec =
child(:hackney, %Hackney.Source{location: output, hackney_opts: [follow_redirect: true]})
|> child(:parser, Membrane.WAV.Parser)
|> get_child(:funnel)
Feliks
FeliksOP10mo ago
This looks like Membrane.WAV.Parser receives buffer with payload, that contains field with value, that for some reason (idk why) is not supported by our parser Where did you take you .wav data from?
skillet
skillet10mo ago
It's generated by the MusicGen AI model running on replicate.com the API outputs a URL like https://replicate.delivery/pbxt/xe13ALmFxxQtX6qbQ2JzIQl4cF8835PeYKHexJaN5gIIQiQlA/out.wav which I pass into the Hackney source
Feliks
FeliksOP10mo ago
Your .wav contains some field specifying chunk type set to "LIST", while our parser supports "data" and "fact" values.
skillet
skillet10mo ago
Interesting I can also get them delivered as mp3, but I didn't see any parsers for that, only decoders expecting an mp3 file rather than a bytestream But I'm probably overlooking something
Feliks
FeliksOP10mo ago
MP3 decoder should be able to handle bytestream, that is not chunked in any specific way
skillet
skillet10mo ago
Ok, I'll give that a shot
Feliks
FeliksOP10mo ago
What do you want to do with this mp3 then?
skillet
skillet10mo ago
Send it through the funnel to be added to an ongoing HLS stream basically the AI will be continually generating these files in the background and I will add them to the stream as they are delivered
Feliks
FeliksOP10mo ago
So decode mp3 end encode raw audio to AAC and put it into HLS
skillet
skillet10mo ago
right @Feliks do I need to explicitly tell the pipeline to wait for its child bin to finish setup before playing?
Feliks
FeliksOP10mo ago
handle_playing will be always called after finishing the setup, but various elements may finish setup and execute handle_playing independently. Every element/bin (let's name it :child) executes handle_playing as soon as: 1. It's own setup is completed 2. Setups of all other elements/bins spawned in the same spec as :child are completed 3. Parent of :child is already playing For pipelines it is simpler: every pipeline executes handle_playing just after finishing setup. Setup is finished: 1. Just after handle_setup, if action {:setup, :incomplete} wasn't returned from it. 2. Otherwise, just after returning action {:setup, :complete}, if handle_setup had returned {:setup, :incomplete} earlier If you shared your code, maybe I would be able to say what is the reason of your problem
skillet
skillet10mo ago
Sure thing!
skillet
skillet10mo ago
I've also tried with all the conversion elements inside the source bin And here is the error I'm currently getting:
[error] GenServer #PID<0.830.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
:erlang.-(nil, nil)
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:281: Membrane.AAC.FDK.Encoder.validate_pts_integrity/2
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:165: Membrane.AAC.FDK.Encoder.handle_buffer/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.16.2) lib/enum.ex:2528: Enum."-reduce/3-lists^foldl/2-0-"/3
(membrane_core 1.0.1) lib/membrane/core/element.ex:232: Membrane.Core.Element.handle_info/2
(stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
(stdlib 5.2) gen_server.erl:1183: :gen_server.handle_msg/6
(stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
[error] GenServer #PID<0.830.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
:erlang.-(nil, nil)
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:281: Membrane.AAC.FDK.Encoder.validate_pts_integrity/2
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:165: Membrane.AAC.FDK.Encoder.handle_buffer/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.16.2) lib/enum.ex:2528: Enum."-reduce/3-lists^foldl/2-0-"/3
(membrane_core 1.0.1) lib/membrane/core/element.ex:232: Membrane.Core.Element.handle_info/2
(stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
(stdlib 5.2) gen_server.erl:1183: :gen_server.handle_msg/6
(stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
I'm sure the hackney element's stream is ending much faster than I can generate new files so I'm not 100% sure how to handle that. I need it to gracefully wait for more files to be available Yeah I guess I need to check if the queue is empty in the end of stream handler and put it in some kind of waiting mode Then in the :get_next_track handler I can check if it is waiting and kick it off again But even with this obvious problem, I'm still not sure why I'm getting that error. Seems that the AAC encoder is getting some data without a pts But now that I think about it, I'm not sure the hackney source's end of stream handler is even being called. But the funnel's is Here is a reworked end of stream handler:
@impl true
def handle_element_end_of_stream(
{:hackney, _url} = element,
_pad,
_context,
%{queue: queue} = state
) do
remove_hackney = [remove_children: [element]]

case LimitedQueue.pop(queue) do
{:ok, queue, next_track} ->
spec =
child({:hackney, next_track}, %Hackney.Source{
location: next_track,
hackney_opts: [follow_redirect: true]
})
|> get_child(:funnel)

{[remove_hackney | [spec: spec]], %{state | queue: queue}}

_ ->
{[remove_hackney | [notify_parent: :waiting]], state}
end
end
@impl true
def handle_element_end_of_stream(
{:hackney, _url} = element,
_pad,
_context,
%{queue: queue} = state
) do
remove_hackney = [remove_children: [element]]

case LimitedQueue.pop(queue) do
{:ok, queue, next_track} ->
spec =
child({:hackney, next_track}, %Hackney.Source{
location: next_track,
hackney_opts: [follow_redirect: true]
})
|> get_child(:funnel)

{[remove_hackney | [spec: spec]], %{state | queue: queue}}

_ ->
{[remove_hackney | [notify_parent: :waiting]], state}
end
end
Feliks
FeliksOP10mo ago
Sorry for delay The error from aac fdk plugin occurs in the changes that where introduced recently, so try downgrading version of this plugin to {:membrance_aac_fdk_plugin, "0.18.2"}
Aske
Aske10mo ago
Thanks! Just spent a couple of hours on this, because I thought I were doing something wrong, and somehow had messed up the pts in my pipeline.
Aske
Aske10mo ago
I opened an issue here https://github.com/membraneframework/membrane_core/issues/791 to track it. @skillet do you have an example stream you can share, to help reproduce the problem? I'm not able to share the one I have.
GitHub
AAC FDK plugin pts bad arg in arithmetic · Issue #791 · membranefra...
When encoding with certain streams (sorry, can't share the one example I have), the pts check fails with: [error] GenServer #PID<0.830.0> terminating ** (ArithmeticError) bad argument in ...
skillet
skillet10mo ago
Thanks guys. @Aske I will put something together for you to reproduce it
Feliks
FeliksOP10mo ago
@Aske @skillet does downgrading membrane_aac_fdk_plugin to 0.18.2 or lower solve your problem and allow you to go further?
skillet
skillet10mo ago
Yes it worked for me!
Aske
Aske10mo ago
Yes, downgrading worked for me! Thanks a lot for suggesting that! ❤️
Feliks
FeliksOP10mo ago
@Aske @skillet We have just released version 0.18.8 of membrane_aac_fdk_plugin fixing the bug that you have met, so you should be able to update this pluing to the newest version and use it without such an error. Let me know, if the new version will cause any other problem
skillet
skillet10mo ago
Thank you @Feliks ! I'll try it out today

Did you find this page helpful?