Testing a filter with flow control :auto?

I'm trying to test a membrane filter where the flow control is set to :auto. I'm using Membrane.Testing.Source and passing in a custom generator function. Though it appears to be only called once. Am I setting this testing pipeline up incorrectly?
4 Replies
TonyLikeSocks
TonyLikeSocksOP2w ago
alias Membrane.Testing.{Pipeline, Source, Sink}

sample_audio =
File.read!(absolute_path)
|> then(fn <<_head::binary-size(44), raw_audio::binary>> -> raw_audio end)

chunk_size = 1024
chunks = for <<chunk::binary-size(chunk_size) <- sample_audio>>, do: chunk
# Handle any remaining bytes in the last chunk
# Omitted

IO.puts("Number of chunks: #{length(chunks)}")

source_generator = fn state, size ->
IO.puts("Source generator called with state #{state}, size: #{size}")
case {state, size} do
{0, chunk_size} ->
IO.puts("Generator called with state 0, size: #{chunk_size}")
{[{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}], 1}

{state, chunk_size} when state <= length(chunks) ->
IO.puts("Generator called with state #{state}, size: #{chunk_size}")
chunk = Enum.at(chunks, state - 1)
IO.puts("Chunk size: #{byte_size(chunk)}")

[{:buffer, {:output, %Buffer{payload: chunk}}}]

{state, chunk_size} ->
IO.puts("Generator called with final state #{state}, size: #{chunk_size}")
{[], state}
end
end

links = [
child(:source, %Source{
output: {0, source_generator},
stream_format: %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}
})
|> child(:speech_to_text, %SpeechToText{
provider: :fireworksai,
personality_type: :test,
transcribe_options: [
temperature: 0.5,
language: "en",
prompt: "This is a transcript of an interview conversation. Add proper punctuation and capitalization."
]
})

|> child(:sink, %Sink{})
]

pipeline = Pipeline.start_link_supervised!(spec: links)
alias Membrane.Testing.{Pipeline, Source, Sink}

sample_audio =
File.read!(absolute_path)
|> then(fn <<_head::binary-size(44), raw_audio::binary>> -> raw_audio end)

chunk_size = 1024
chunks = for <<chunk::binary-size(chunk_size) <- sample_audio>>, do: chunk
# Handle any remaining bytes in the last chunk
# Omitted

IO.puts("Number of chunks: #{length(chunks)}")

source_generator = fn state, size ->
IO.puts("Source generator called with state #{state}, size: #{size}")
case {state, size} do
{0, chunk_size} ->
IO.puts("Generator called with state 0, size: #{chunk_size}")
{[{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}], 1}

{state, chunk_size} when state <= length(chunks) ->
IO.puts("Generator called with state #{state}, size: #{chunk_size}")
chunk = Enum.at(chunks, state - 1)
IO.puts("Chunk size: #{byte_size(chunk)}")

[{:buffer, {:output, %Buffer{payload: chunk}}}]

{state, chunk_size} ->
IO.puts("Generator called with final state #{state}, size: #{chunk_size}")
{[], state}
end
end

links = [
child(:source, %Source{
output: {0, source_generator},
stream_format: %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}
})
|> child(:speech_to_text, %SpeechToText{
provider: :fireworksai,
personality_type: :test,
transcribe_options: [
temperature: 0.5,
language: "en",
prompt: "This is a transcript of an interview conversation. Add proper punctuation and capitalization."
]
})

|> child(:sink, %Sink{})
]

pipeline = Pipeline.start_link_supervised!(spec: links)
The only output I see in my test:
Number of chunks: 313
Source generator called with state 0, size: 400
Generator called with state 0, size: 400
Speech detected
Number of chunks: 313
Source generator called with state 0, size: 400
Generator called with state 0, size: 400
Speech detected
So I found the error where I'm not returning the correct tuple {[actions], new_state} -- but even fixing that my generator is only getting called once. I'm wondering if since my element is using auto flow control, my test isn't properly simulating the demand. Backing up, I'm trying to simulate an audio pipeline where we pass a VAD speech event which starts transcription and then the stream of audio, and finally a VAD silence which ends transcription. I assumed I had to use the generator to simulate the stream. Alternatively, I could so something like...
buffer_actions = Enum.map(chunks, fn chunk ->
{:buffer, {:output, %Buffer{payload: chunk}}}
end)

actions = [
{:stream_format, {:output, %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}}},
{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}
] ++ buffer_actions ++ [
{:event, {:output, %VoiceActivityChanged{voice_activity: :silence}}}
]
buffer_actions = Enum.map(chunks, fn chunk ->
{:buffer, {:output, %Buffer{payload: chunk}}}
end)

actions = [
{:stream_format, {:output, %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}}},
{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}
] ++ buffer_actions ++ [
{:event, {:output, %VoiceActivityChanged{voice_activity: :silence}}}
]
Which does the same thing, though it seems like the buffers are sent through all at once. I can't tell if it mimics what is happening in a normal pipeline
Feliks
Feliks2w ago
Add an action redemand: :output at the end of the list of actions returned in source_generator and let me know, if it helped
TonyLikeSocks
TonyLikeSocksOP2w ago
Ah ha! Thanks @Feliks That did the trick.
Feliks
Feliks2w ago
FYI action redemand checks, if the whole output demand has been already satisfied. If no, it triggers calling handle_demand callback once again. In this case, it means that source_generator is executed once again

Did you find this page helpful?