Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?

I have a simple pipeline for processing text. I am using flow_control as auto for all elements in the pipeline besides the source which has flow_control set to push. All elements (with the exception of the source have 1000s of messages of type
app-1 | [debug] <0.3808.0>/:some_name Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
app-1 | [debug] <0.3808.0>/:some_name Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
While source only has 1. This is happening when no actual data is flowing through the pipeline. Looking for guidance on, - is this expected ? Is handle_action being called continuously without any data being sent (ref) . Or, am I doing something woefully wrong ? - any way i can reduce log output for it ? Other useful log debug messages get washed out because this is producing 100x more volume
GitHub
membrane_core/lib/membrane/core/element/action_handler.ex at ac6b79...
The core of Membrane Framework, multimedia processing framework written in Elixir - membraneframework/membrane_core
5 Replies
Sameer
SameerOP3w ago
I tried using flow_control with push across the board and that didn't seem to change the log output.
varsill
varsill3w ago
Hello! 1) Sending stream format that frequently is almost certainly not an expected behaviour. I suspect that the first element in the pipeline behind the source somehow "duplicates" the stream format (by returning it as an action in one of its callbacks, that frequently gets called). Later on, the stream formats are just passed through other elements and since there is already a lot of them, you will see a lot of logs from other elements. Could you tell me what is the element right behind your source element? Generally speaking, the stream format is expected to be sent once, before the first buffer (and possibly once in a while later on, if the format changes, for instance, when the resolution of your video changes). 2) In a first place we should figure out why the stream formats are sent that frequently, but there is a couple of things you can do if you want to preserve your logs: * you can play around the logger limits: https://www.erlang.org/doc/apps/kernel/logger_chapter.html#message-queue-length, set them high enough for the logs not to be washed out and then redirect the output to some file * the simplest solution would be to remove the problematic log (https://github.com/membraneframework/membrane_core/blob/ac6b793e2ea46316af569735c63dade1d83f6ddf/lib/membrane/core/element/action_handler.ex#L366 ) in membrane_core dependency and recompile it with MIX_ENV=<your env> mix deps.compile membrane core 😉
Sameer
SameerOP3w ago
Hi Lukasz Thanks, I suspected I was doing something wrong 🙂 Only the source element has a handle_playing method defined as
@impl true
def handle_playing(_ctx, state) do
...Doing some work....
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
@impl true
def handle_playing(_ctx, state) do
...Doing some work....
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
It receives a handle_parent_notification callback and returns {[buffer: {:output, buffer}], state} to pass payloads to the next element. The element thats right after the source in the pipeline is one that queries an LLM. Pasted the code for it below.
defmodule LLM do
use Membrane.Filter
require Membrane.Logger

def_input_pad(:input, accepted_format: _any,flow_control: :auto)
def_output_pad(:output,accepted_format: _any,flow_control: :auto)
def_input_pad(:control,accepted_format: _any,flow_control: :auto)

def_options(
chat_context: [
spec: [ChatMessage.t()],
description: "Default chat context to seed LLM with."
]
)

@impl true
def handle_init(_ctx, options) do
{[],%{}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
....
case sync_api do
true ->
{:ok, content} = ....
buffer = %Membrane.Buffer{
buffer | payload: content
}
{[buffer: {:output, buffer}], %{
state | chat_context: chat_context
}}
false ->
.....
{[], %{state |
chat_context: chat_context
}}
end

end

@impl true
def handle_info({:streaming_response, {:text, response}}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: response,
}
{[buffer: {:output, buffer}], state}
end

def handle_info({:streaming_done}, _ctx, state) do
{[], state}
end

@impl true
def handle_event(:control, %MyMsg{message: message}, _ctx, state) do
....
{[], %{state | ....}}
end

end
defmodule LLM do
use Membrane.Filter
require Membrane.Logger

def_input_pad(:input, accepted_format: _any,flow_control: :auto)
def_output_pad(:output,accepted_format: _any,flow_control: :auto)
def_input_pad(:control,accepted_format: _any,flow_control: :auto)

def_options(
chat_context: [
spec: [ChatMessage.t()],
description: "Default chat context to seed LLM with."
]
)

@impl true
def handle_init(_ctx, options) do
{[],%{}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
....
case sync_api do
true ->
{:ok, content} = ....
buffer = %Membrane.Buffer{
buffer | payload: content
}
{[buffer: {:output, buffer}], %{
state | chat_context: chat_context
}}
false ->
.....
{[], %{state |
chat_context: chat_context
}}
end

end

@impl true
def handle_info({:streaming_response, {:text, response}}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: response,
}
{[buffer: {:output, buffer}], state}
end

def handle_info({:streaming_done}, _ctx, state) do
{[], state}
end

@impl true
def handle_event(:control, %MyMsg{message: message}, _ctx, state) do
....
{[], %{state | ....}}
end

end
Its the element thats in the logs
:llm Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
:llm Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
Maybe another fact to mention, the pipeline is defined like
defmodule MyPipeline do
use Membrane.Pipeline

require Membrane.Logger
@impl true
def handle_init(_ctx, config) do

spec = [
{ #Data Flow
child(:source, %MySource{
...
})
|> child(:llm,%LLM{
chat_context: config.chat_context
})
|> child(:tts,%MyTTS{
...
})
|> child(:sink,%Membrane.Debug.Sink{handle_buffer: &IO.puts(&1.payload)}),
},
{#Control Flow
get_child(:tts)
|> via_out(:control)
|> via_in(:control)
|> get_child(:llm),
}]
{[spec: spec], %{}}
end

@impl true
def handle_info(%{user_message: chat_message}, _ctx, state) do
{[notify_child: {:source, %{user_message: chat_message}}], state}

end

end
defmodule MyPipeline do
use Membrane.Pipeline

require Membrane.Logger
@impl true
def handle_init(_ctx, config) do

spec = [
{ #Data Flow
child(:source, %MySource{
...
})
|> child(:llm,%LLM{
chat_context: config.chat_context
})
|> child(:tts,%MyTTS{
...
})
|> child(:sink,%Membrane.Debug.Sink{handle_buffer: &IO.puts(&1.payload)}),
},
{#Control Flow
get_child(:tts)
|> via_out(:control)
|> via_in(:control)
|> get_child(:llm),
}]
{[spec: spec], %{}}
end

@impl true
def handle_info(%{user_message: chat_message}, _ctx, state) do
{[notify_child: {:source, %{user_message: chat_message}}], state}

end

end
Removing the section Control Flow and removing the control pads results in the logs not continuously outputting Sending stream format through pad. So, am I doing something thats not recommended to send messages upstream in the pipeline ? Or I haven't configured things correctly ? After following @Feliks 's advice and simplifying the pipeline (by dropping the control flow section), send_stream_format is not being called continuously and the logs arent being overrun.
varsill
varsill3w ago
Great to hear that it no longer produces those stream_format logs! The reason why you observed this strange behaviour was that with this additional "bypass" link from filter3 to filter1 the stream format was duplicated in filter3 . The default implementation of the handle_stream_format is to return :forward action, which sends the stream format on ALL available output pads. In your case, it was sending it both to the subsequent filter (as expected) and the the filter1, through your "bypass" link. Once received by filter1, it was once again sent to filter2 and filter3, therefore starting to "loop" 😉
Sameer
SameerOP3w ago
ah gotcha, thx for the explanation 👍

Did you find this page helpful?