How do I invoke Pipeline.handle_info from a Phoenix.WebChannel?

I'm trying to pass audio from a Phoenix Web Channel to a Pipeline. Also, are there any notify_child usage examples? I don't understand how to use Membrane.Pipeline.notify_child. Do I need to define it first? If so, what should the definition look like? Update: I was able to pass a message to the pipeline by making the following changes to my Channel handle_inlike so:
def handle_in("new_audio", %{"data" => audio_data}, socket) do
IO.inspect("pipelines: ")
# IO.inspect(Membrane.Pipeline.list_pipelines())
[head | _] = Membrane.Pipeline.list_pipelines()
# IO.inspect(head)
send(head, audio_data)
{:noreply, socket}
end
def handle_in("new_audio", %{"data" => audio_data}, socket) do
IO.inspect("pipelines: ")
# IO.inspect(Membrane.Pipeline.list_pipelines())
[head | _] = Membrane.Pipeline.list_pipelines()
# IO.inspect(head)
send(head, audio_data)
{:noreply, socket}
end
But I got the error:
[error] <0.768.0>/ Error occured in Membrane Pipeline:
%Membrane.CallbackError{
message: "Invalid value returned from AudioTranscription.StreamToFile.:handle_info:
[error] <0.768.0>/ Error occured in Membrane Pipeline:
%Membrane.CallbackError{
message: "Invalid value returned from AudioTranscription.StreamToFile.:handle_info:
This is what my Membrane.Pipeline.handle_info looks like
@impl true
def handle_info(audio_data, ctx, _state) do
# Process the audio data
IO.inspect("ctx")
IO.inspect(ctx)
IO.inspect("audio_data")
IO.inspect(audio_data)
# ctx.notify_child(AudioTranscription.Source, audio_data)
# AudioTranscription.AudioSource.receive_audio_data(audio_data)
end
@impl true
def handle_info(audio_data, ctx, _state) do
# Process the audio data
IO.inspect("ctx")
IO.inspect(ctx)
IO.inspect("audio_data")
IO.inspect(audio_data)
# ctx.notify_child(AudioTranscription.Source, audio_data)
# AudioTranscription.AudioSource.receive_audio_data(audio_data)
end
7 Replies
Noarkhh
Noarkhh•6mo ago
Hi @Villain In Glasses All Membrane.Pipeline and Membrane.Element callbacks need to return a tuple of Actions and the next state (as shown here) notify_child is an action, so to use it you need to return it from the handle_info callback. In your case the return value in handle_info should probably look like this:
{[notify_child: {<child_name>, <your_message>}], state}
{[notify_child: {<child_name>, <your_message>}], state}
Thanks to this the specified child will receive the message after the handle_info callback execution finishes
Villain In Glasses
Villain In GlassesOP•6mo ago
Oh, so that's how you use it. Thanks for the clarification. Also, now I'm getting another error:
Error occured in Membrane Pipeline:
%Membrane.UnknownChildError{
message: "Child of name AudioTranscription.Source doesn't exist. Available children are [].\n"
}
Error occured in Membrane Pipeline:
%Membrane.UnknownChildError{
message: "Child of name AudioTranscription.Source doesn't exist. Available children are [].\n"
}
Is this the wrong way to define specify children?
@impl true
def handle_init(_) do
spec =
child(AudioTranscription.Source)
|> child(%Membrane.File.Sink{location: "output.wav"})

{[spec: spec], %{}}
end
@impl true
def handle_init(_) do
spec =
child(AudioTranscription.Source)
|> child(%Membrane.File.Sink{location: "output.wav"})

{[spec: spec], %{}}
end
Noarkhh
Noarkhh•6mo ago
to give your child a name that can be used to locate this child you need to explicitly specify it in the child function, otherwise the child will be anonymous. You could specify the name like this:
child(:source, AudioTranscription.Source)
child(:source, AudioTranscription.Source)
and the child would be available under name :source
Villain In Glasses
Villain In GlassesOP•6mo ago
Nope, I updated the code to:
@impl true
def handle_init(_) do
spec =
child(:source, AudioTranscription.Source)
|> child(:sink, %Membrane.File.Sink{location: "output.wav"})

{[spec: spec], %{}}
end

@impl true
def handle_info(audio_data, ctx, state) do
# Process the audio data
{[notify_child: {:source, audio_data}], state}
end
@impl true
def handle_init(_) do
spec =
child(:source, AudioTranscription.Source)
|> child(:sink, %Membrane.File.Sink{location: "output.wav"})

{[spec: spec], %{}}
end

@impl true
def handle_info(audio_data, ctx, state) do
# Process the audio data
{[notify_child: {:source, audio_data}], state}
end
I'm still getting:
[error] <0.768.0>/ Error occured in Membrane Pipeline:
%Membrane.UnknownChildError{
message: "Child of name :source doesn't exist. Available children are [].\n"
[error] <0.768.0>/ Error occured in Membrane Pipeline:
%Membrane.UnknownChildError{
message: "Child of name :source doesn't exist. Available children are [].\n"
Noarkhh
Noarkhh•6mo ago
handle_init has arity of 2, try
def handle_init(_ctx, _opts) do
...
def handle_init(_ctx, _opts) do
...
Villain In Glasses
Villain In GlassesOP•6mo ago
Thank you so much! That worked 😄
Noarkhh
Noarkhh•6mo ago
no problem, happy to help ;)
Want results from more Discord servers?
Add your server