Pipeline Error: Pipeline Failed to Terminate within Timeout (5000ms)

This is a bit of a head scratcher for me. I'm in the process of writing a new element for my pipeline that uses the Silero VAD module for speech detection (rather than the built in WebRTC Engine VAD extension). I've got it working, but hitting a wierd bug. Now when my engine terminates (peer leaves), I'm getting this error: ** (Membrane.PipelineError) Pipeline #PID<0.1499.0> hasn't terminated within given timeout (5000 ms). The only thing that's changed is my new element in the pipeline (it's setup as a Membrane.Filter). If I remove the element from the pipeline, then the error goes away. I could obviously bump the timeout, but before I do that I thought to ask for advice. Why would adding an element increase the engine shutdown time? What's the right way to dig into this?
11 Replies
TonyLikeSocks
TonyLikeSocksOP3mo ago
I'm loading the Ortex model within the element init and storing it within the state model = Ortex.load(@model_path) But beyond that, everything else seems pretty straight forward logic / state management (if prob > threshold blah, etc.)
Feliks
Feliks3mo ago
Termination of the whole pipline is postopned to the moment when all bins and elements are terminated, so if your custom VAD element doesn't terminate when it should, it can delay termination of the whole pipeline
TonyLikeSocks
TonyLikeSocksOP3mo ago
How should I go about determining why my element is taking too long to terminate? I don’t have a handle_terminate callback defined.
Feliks
Feliks3mo ago
You can just spawn a new Task in handle_init in your custom element. This task should Process.monitor the element and dbg(some message) after the element dies. Something like this should tell us if termination of this element is the problem or not
TonyLikeSocks
TonyLikeSocksOP3mo ago
I didn't do that, but I did just try;
@impl true
def handle_terminate_request(_ctx, state) do
start_time = System.monotonic_time(:millisecond)
IO.puts("========== VAD terminating ==========")

state
|> Map.delete(:model)
|> Map.delete(:audio_buffer)
|> Map.delete(:lstm)

:erlang.garbage_collect()

end_time = System.monotonic_time(:millisecond)
IO.puts("VAD terminated in #{end_time - start_time}ms")

{[], state}
end
@impl true
def handle_terminate_request(_ctx, state) do
start_time = System.monotonic_time(:millisecond)
IO.puts("========== VAD terminating ==========")

state
|> Map.delete(:model)
|> Map.delete(:audio_buffer)
|> Map.delete(:lstm)

:erlang.garbage_collect()

end_time = System.monotonic_time(:millisecond)
IO.puts("VAD terminated in #{end_time - start_time}ms")

{[], state}
end
I get a VAD shutdown time of 0ms, and then 5000ms later, I get the error message. Unless this test isn't reliable Ahh. Interesting, I did your method and got an entirely different result.
Feliks
Feliks3mo ago
handle_terminate_request is executed while the element process is still alive and an element might live for a while after this callback ends.
Ahh. Interesting, I did your method and got an entirely different result.
It means that probably the new element is responsible for the pipeline termination delay. It is hard for me to say something more without the code of the whole element
TonyLikeSocks
TonyLikeSocksOP3mo ago
08:59:34.478 [error] GenServer "ac02dc54-14ba-4791-86c9-fc45bd38ea32" terminating
** (Membrane.PipelineError) Pipeline #PID<0.1504.0> hasn't terminated within given timeout (5000 ms).
If you want to kill it anyway, use `force?: true` option.


Last message: :terminate_request, metadata: line=1391 pid=<0.1500.0> file=gen_server.erl domain=otp mfa=:gen_server.error_info/8 room_id=ac02dc54-14ba-4791-86c9-fc45bd38ea32
Termination time: 2024-10-03 12:59:34.480162Z
[lib/smartvox/endpoints/conversation/vad.ex:59: Smartvox.Endpoints.Conversation.SileroVAD.handle_init/2]
binding() #=> [
_ctx: %{
name: :silero_vad,
resource_guard: #PID<0.1636.0>,
pads: %{},
playback: :stopped,
clock: nil,
utility_supervisor: #PID<0.1634.0>,
parent_clock: #PID<0.1510.0>
},
_pid: #PID<0.1635.0>,
element_pid: #PID<0.1635.0>,
options: %Smartvox.Endpoints.Conversation.SileroVAD{
threshold: 0.5,
min_speech_duration: 0.05,
min_silence_duration: 0.25,
smoothing_factor: 0.7
},
reason: :shutdown,
ref: #Reference<0.3075342379.1387790343.180566>,
state: %{
chunk_size: nil,
threshold: 0.5,
min_speech_duration: 0.05,
min_silence_duration: 0.25,
smoothing_factor: 0.7,
model: nil,
audio_buffer: "",
lstm: #Nx.Tensor<
f32[2][1][128]
[
[
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...]
],
...
]
>,
last_vad_state: :silence,
speech_duration: 0.0,
silence_duration: 0.0,
last_prob: 0
}
]
08:59:34.478 [error] GenServer "ac02dc54-14ba-4791-86c9-fc45bd38ea32" terminating
** (Membrane.PipelineError) Pipeline #PID<0.1504.0> hasn't terminated within given timeout (5000 ms).
If you want to kill it anyway, use `force?: true` option.


Last message: :terminate_request, metadata: line=1391 pid=<0.1500.0> file=gen_server.erl domain=otp mfa=:gen_server.error_info/8 room_id=ac02dc54-14ba-4791-86c9-fc45bd38ea32
Termination time: 2024-10-03 12:59:34.480162Z
[lib/smartvox/endpoints/conversation/vad.ex:59: Smartvox.Endpoints.Conversation.SileroVAD.handle_init/2]
binding() #=> [
_ctx: %{
name: :silero_vad,
resource_guard: #PID<0.1636.0>,
pads: %{},
playback: :stopped,
clock: nil,
utility_supervisor: #PID<0.1634.0>,
parent_clock: #PID<0.1510.0>
},
_pid: #PID<0.1635.0>,
element_pid: #PID<0.1635.0>,
options: %Smartvox.Endpoints.Conversation.SileroVAD{
threshold: 0.5,
min_speech_duration: 0.05,
min_silence_duration: 0.25,
smoothing_factor: 0.7
},
reason: :shutdown,
ref: #Reference<0.3075342379.1387790343.180566>,
state: %{
chunk_size: nil,
threshold: 0.5,
min_speech_duration: 0.05,
min_silence_duration: 0.25,
smoothing_factor: 0.7,
model: nil,
audio_buffer: "",
lstm: #Nx.Tensor<
f32[2][1][128]
[
[
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...]
],
...
]
>,
last_vad_state: :silence,
speech_duration: 0.0,
silence_duration: 0.0,
last_prob: 0
}
]
TonyLikeSocks
TonyLikeSocksOP3mo ago
Here's a gist with the module. I was thinking of open sourcing it, if I could get it stable. https://gist.github.com/Tonyhaenn/66c9148b2ae73a5009894250a0b6f6d7
Gist
Silero VAD
Silero VAD. GitHub Gist: instantly share code, notes, and snippets.
TonyLikeSocks
TonyLikeSocksOP3mo ago
It's stable insofar as it works. The delayed shutdown is just puzzling. I'm guessing it's Ortex + the use of Rustler. Though I've looked through those docs and the code, and I didn't see an "unload" or cleanup type function. Though tellingly, if I replace my do_predict/3 function with a dummy function like so:
defp do_predict(model, model_state, audio) do
{0.5, "dummy_state"}
end
defp do_predict(model, model_state, audio) do
{0.5, "dummy_state"}
end
I don't get the same timeout failure. So it's defintely that Ortex.run/2 call.
Feliks
Feliks3mo ago
It seems reasonable, that your element enters Ortex.run and it stays there for long enough, that default timeout in Pipeline.terminate is exceeded
TonyLikeSocks
TonyLikeSocksOP3mo ago
The inference time is ~1 ms according to my logging.

Did you find this page helpful?