Testing Membrane Element

I'm trying to setup a simple test of a membrane element, but I'm stumped a bit on how to assert that the element is sending the correct output. For context, this is an element that accepts an audio stream, sends it to a speech 2 text api and then forwards along a text buffer. The test fails, as there's no 'buffer' in the mailbox. Though I'm positive that's what my element emits when the stream is complete. I've tried longer timeouts (10s) but that doesn't alter the test. I could use some advice.
test "Converts sample audio to text" do
current_dir = __DIR__
file_path = Path.join([current_dir, "../../fixtures/test.wav"])
absolute_path = Path.expand(file_path)
sample_audio = File.read!(absolute_path)

expected_text =
"Adventure 1 a scandal in Bohemia from The Adventures of Sherlock Holmes by Sir Arthur Conan Doyle. This is a LibriVox recording."

links = [
child(:source, %Source{output: [sample_audio], stream_format: %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}})
|> child(:speech_2_text, Speech2Text)
|> child(:sink, %Sink{})
]

options = [
spec: links
]

pipeline = Pipeline.start_link_supervised!(options)

assert_sink_buffer(pipeline, :sink, {:buffer, %Buffer{payload: ^expected_text}}, 5_000)

Pipeline.terminate(pipeline)
end
test "Converts sample audio to text" do
current_dir = __DIR__
file_path = Path.join([current_dir, "../../fixtures/test.wav"])
absolute_path = Path.expand(file_path)
sample_audio = File.read!(absolute_path)

expected_text =
"Adventure 1 a scandal in Bohemia from The Adventures of Sherlock Holmes by Sir Arthur Conan Doyle. This is a LibriVox recording."

links = [
child(:source, %Source{output: [sample_audio], stream_format: %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}})
|> child(:speech_2_text, Speech2Text)
|> child(:sink, %Sink{})
]

options = [
spec: links
]

pipeline = Pipeline.start_link_supervised!(options)

assert_sink_buffer(pipeline, :sink, {:buffer, %Buffer{payload: ^expected_text}}, 5_000)

Pipeline.terminate(pipeline)
end
13 Replies
mat_hek
mat_hekā€¢10mo ago
Hi @TonyLikeSocks, what sink are you using? The assertions only work for Membrane.Testing.Sink
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
Yup. Iā€™m using the Membrane.Testing library
mat_hek
mat_hekā€¢10mo ago
Ok, so it should be assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^expected_text}, 5_000) instead of assert_sink_buffer(pipeline, :sink, {:buffer, %Buffer{payload: ^expected_text}}, 5_000)
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
Hmm. Still not working. My element ends the stream when we get a VAD of :silence. These are the last two lines of the handle_event call for the VAD
{actions, state} = end_stream(state)
{[forward: event] ++ actions, %{state | active?: false}}
{actions, state} = end_stream(state)
{[forward: event] ++ actions, %{state | active?: false}}
The end_stream has this case as the last clause:
case Instance.receive_stream_data(state.channel, state.stream, 5000) do
{:ok, text} ->
{[buffer: {:output, %Buffer{payload: text}}], %{state | stream: nil}}

{:error, error} ->
Membrane.Logger.error("Failed to receive stream data.\nReason: #{inspect(error)}")
{[buffer: {:output, %Buffer{payload: ""}}], %{state | stream: nil}}
end
case Instance.receive_stream_data(state.channel, state.stream, 5000) do
{:ok, text} ->
{[buffer: {:output, %Buffer{payload: text}}], %{state | stream: nil}}

{:error, error} ->
Membrane.Logger.error("Failed to receive stream data.\nReason: #{inspect(error)}")
{[buffer: {:output, %Buffer{payload: ""}}], %{state | stream: nil}}
end
Which led me to think I needed to match on the buffer. But it doesn't seem like it's arriving to the sink The test output, gives me these six events:
value: {Membrane.Testing.Pipeline, #PID<0.678.0>, :setup}
value: {Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_child_notification, {:playing, :sink}}}
{
Membrane.Testing.Pipeline,
#PID<0.678.0>,
{
:handle_child_notification,
{
{:stream_format, :input, %Callsanta.Endpoints.Santa.Text{alignment: :full, paragraph_limit_event?: false}},
:sink
}
}
}
{Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_element_start_of_stream, {:speech_2_text, :input}}}
{Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_element_end_of_stream, {:speech_2_text, :input}}}
value: {Membrane.Testing.Pipeline, #PID<0.678.0>, :setup}
value: {Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_child_notification, {:playing, :sink}}}
{
Membrane.Testing.Pipeline,
#PID<0.678.0>,
{
:handle_child_notification,
{
{:stream_format, :input, %Callsanta.Endpoints.Santa.Text{alignment: :full, paragraph_limit_event?: false}},
:sink
}
}
}
{Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_element_start_of_stream, {:speech_2_text, :input}}}
{Membrane.Testing.Pipeline, #PID<0.678.0>, {:handle_element_end_of_stream, {:speech_2_text, :input}}}
None of which are the buffer I'm expecting Is this a timing issue? Do I need to assert on end of stream first, and then check that the buffer I'm expecting arrived? Looking at: https://github.com/membraneframework/membrane_core/issues/239 and https://github.com/orgs/membraneframework/discussions/715
mat_hek
mat_hekā€¢10mo ago
The discussion is about the fact that no matter if you do assert_sink_buffer first and then assert_end_of_stream or the other way, it would work even if the end_of_stream arrived before the buffer, so I don't think it's related If you have doubts whether the buffer arrives, you can plug the %Membrane.Debug.Filter{handle_buffer: &IO.inspect/1} before the sink
mat_hek
mat_hekā€¢10mo ago
I prepared a script similar to your code that has the testing source and the sink and it works for me. If you plug your element in there and it stops working, then the element probably consumes the first buffer and doesn't output anything.
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
Thank you. That's super helpful I'm going over my code more closely, and it buffers any incoming stream until it receives a vad :speech event. So it seems like I need to do something like:
Pipeline.message_child(pipeline, :speech_2_text, %VoiceActivityChanged{voice_activity: :speech})
Pipeline.message_child(pipeline, :speech_2_text, %VoiceActivityChanged{voice_activity: :speech})
But that's not tripping the handle_info call in my element.
mat_hek
mat_hekā€¢10mo ago
Pipeline.message_child actually sends a notification, so handle_parent_notification will be executed instead of handle_info. It's quite counter intuitive, we should rename it to notify_child
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
ahh. Is there a way to send a message that trips a handle_info block? Pipeline.execute_actions ?
mat_hek
mat_hekā€¢10mo ago
you can call Pipeline.get_child_pid and do a regular send
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
šŸ‘ I'll give that a shot and let you know if I still have problems. Thanks so much for the help Like this?
s2t_pid = Pipeline.get_child_pid!(pipeline, :speech_2_text)
send(s2t_pid, %VoiceActivityChanged{voice_activity: :speech})
s2t_pid = Pipeline.get_child_pid!(pipeline, :speech_2_text)
send(s2t_pid, %VoiceActivityChanged{voice_activity: :speech})
That's not tripping the handle_event in the element I realize I had a typo above. I kept saying "handle_info" and what I meant was handle_event. So I need to simulate a VAD event flowing through the pipeline. The send/2 function definitely trips a handle_info in my element, but what I need is to trip a handle_event
mat_hek
mat_hekā€¢10mo ago
You only can trigger handle_event by sending an event from a preceding element. To do that in the testing source, you should pass {initial_state, generator} instead of enumerable as the value of output, and the generator should return the buffer, the event and the end of stream action. See the docs for details: https://hexdocs.pm/membrane_core/Membrane.Testing.Source.html#module-element-options
TonyLikeSocks
TonyLikeSocksOPā€¢10mo ago
Ah ha! This unblocked me. Thank you!
Want results from more Discord servers?
Add your server