Sameer
Sameer
SMSoftware Mansion
Created by enzoqtvf on 1/10/2025 in #membrane-help
How to split a Raw Audio Buffer with 2 channels within frame into two different buffer
Could you share what the pipeline looks like ? It will help anyone else who has a similar question in the future 🙂
3 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
How to send control events upstream/downstream ?
It also resolved the other problem I introduced 🤭
7 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
ah gotcha, thx for the explanation 👍
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
After following @Feliks 's advice and simplifying the pipeline (by dropping the control flow section), send_stream_format is not being called continuously and the logs arent being overrun.
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
How to send control events upstream/downstream ?
Thanks a lot @Feliks Thats even cleaner !!
7 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
Maybe another fact to mention, the pipeline is defined like
defmodule MyPipeline do
use Membrane.Pipeline

require Membrane.Logger
@impl true
def handle_init(_ctx, config) do

spec = [
{ #Data Flow
child(:source, %MySource{
...
})
|> child(:llm,%LLM{
chat_context: config.chat_context
})
|> child(:tts,%MyTTS{
...
})
|> child(:sink,%Membrane.Debug.Sink{handle_buffer: &IO.puts(&1.payload)}),
},
{#Control Flow
get_child(:tts)
|> via_out(:control)
|> via_in(:control)
|> get_child(:llm),
}]
{[spec: spec], %{}}
end

@impl true
def handle_info(%{user_message: chat_message}, _ctx, state) do
{[notify_child: {:source, %{user_message: chat_message}}], state}

end

end
defmodule MyPipeline do
use Membrane.Pipeline

require Membrane.Logger
@impl true
def handle_init(_ctx, config) do

spec = [
{ #Data Flow
child(:source, %MySource{
...
})
|> child(:llm,%LLM{
chat_context: config.chat_context
})
|> child(:tts,%MyTTS{
...
})
|> child(:sink,%Membrane.Debug.Sink{handle_buffer: &IO.puts(&1.payload)}),
},
{#Control Flow
get_child(:tts)
|> via_out(:control)
|> via_in(:control)
|> get_child(:llm),
}]
{[spec: spec], %{}}
end

@impl true
def handle_info(%{user_message: chat_message}, _ctx, state) do
{[notify_child: {:source, %{user_message: chat_message}}], state}

end

end
Removing the section Control Flow and removing the control pads results in the logs not continuously outputting Sending stream format through pad. So, am I doing something thats not recommended to send messages upstream in the pipeline ? Or I haven't configured things correctly ?
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
The element thats right after the source in the pipeline is one that queries an LLM. Pasted the code for it below.
defmodule LLM do
use Membrane.Filter
require Membrane.Logger

def_input_pad(:input, accepted_format: _any,flow_control: :auto)
def_output_pad(:output,accepted_format: _any,flow_control: :auto)
def_input_pad(:control,accepted_format: _any,flow_control: :auto)

def_options(
chat_context: [
spec: [ChatMessage.t()],
description: "Default chat context to seed LLM with."
]
)

@impl true
def handle_init(_ctx, options) do
{[],%{}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
....
case sync_api do
true ->
{:ok, content} = ....
buffer = %Membrane.Buffer{
buffer | payload: content
}
{[buffer: {:output, buffer}], %{
state | chat_context: chat_context
}}
false ->
.....
{[], %{state |
chat_context: chat_context
}}
end

end

@impl true
def handle_info({:streaming_response, {:text, response}}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: response,
}
{[buffer: {:output, buffer}], state}
end

def handle_info({:streaming_done}, _ctx, state) do
{[], state}
end

@impl true
def handle_event(:control, %MyMsg{message: message}, _ctx, state) do
....
{[], %{state | ....}}
end

end
defmodule LLM do
use Membrane.Filter
require Membrane.Logger

def_input_pad(:input, accepted_format: _any,flow_control: :auto)
def_output_pad(:output,accepted_format: _any,flow_control: :auto)
def_input_pad(:control,accepted_format: _any,flow_control: :auto)

def_options(
chat_context: [
spec: [ChatMessage.t()],
description: "Default chat context to seed LLM with."
]
)

@impl true
def handle_init(_ctx, options) do
{[],%{}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
....
case sync_api do
true ->
{:ok, content} = ....
buffer = %Membrane.Buffer{
buffer | payload: content
}
{[buffer: {:output, buffer}], %{
state | chat_context: chat_context
}}
false ->
.....
{[], %{state |
chat_context: chat_context
}}
end

end

@impl true
def handle_info({:streaming_response, {:text, response}}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: response,
}
{[buffer: {:output, buffer}], state}
end

def handle_info({:streaming_done}, _ctx, state) do
{[], state}
end

@impl true
def handle_event(:control, %MyMsg{message: message}, _ctx, state) do
....
{[], %{state | ....}}
end

end
Its the element thats in the logs
:llm Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
:llm Sending stream format through pad :output
app-1 | Stream format: %Membrane.RemoteStream{content_format: nil, type: :bytestream}
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
Hi Lukasz Thanks, I suspected I was doing something wrong 🙂 Only the source element has a handle_playing method defined as
@impl true
def handle_playing(_ctx, state) do
...Doing some work....
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
@impl true
def handle_playing(_ctx, state) do
...Doing some work....
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
It receives a handle_parent_notification callback and returns {[buffer: {:output, buffer}], state} to pass payloads to the next element.
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
Logs are overrun with `Sending stream format through pad` messages. Am I doing something wrong ?
I tried using flow_control with push across the board and that didn't seem to change the log output.
9 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
How to send control events upstream/downstream ?
Figured it out after your help and doing some more reading - https://hexdocs.pm/membrane_core/Membrane.ChildrenSpec.html#t:t/0 In case someone else needs to figure this out too, it involves creating a separate link in the pipeline spec with
get_child(:filter 3)
|> via_out(:control)
|> via_in(:control)
|> get_child(:filter 1),
get_child(:filter 3)
|> via_out(:control)
|> via_in(:control)
|> get_child(:filter 1),
7 replies
SMSoftware Mansion
Created by Sameer on 1/5/2025 in #membrane-help
How to send control events upstream/downstream ?
Thanks @samrat Can you point me to how an element can send the event to an upstream element ? In the case Source A -> Filter 1 -> Filter 2 -> Filter 3 -> Sink X, how to define Filter 3 sending an event to Filter 1 ?
7 replies