Issue Membrane Upgrade to 1.1 (from 0.12.9)

I'm trying to go from Membrane 0.12.9 to 1.1 and hitting a wall. I followed the upgrade guide[1], and everything compiles successfully. Though I'm getting the following error in my pipeline:
19:50:34.066 [error] <0.4709.0>/{:endpoint, "conversation_endpoint"}/:opus_payloader/:header_generator Error occured in Membrane Element:
%UndefinedFunctionError{
module: Coerce.Implementations.Atom.Integer,
function: :coerce,
arity: 2,
reason: nil,
message: nil
}
Coerce.Implementations.Atom.Integer.coerce(nil, 48000)
(numbers 5.2.4) lib/numbers.ex:98: Numbers.mult/2
(membrane_rtp_plugin 0.24.1) lib/membrane/rtp/header_generator.ex:71: Membrane.RTP.HeaderGenerator.handle_buffer/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.15.6) lib/enum.ex:2510: Enum."-reduce/3-lists^foldl/2-0-"/3

...trimming error stack

Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<72, 11, 239, 4, 254, 218, 15, 39, 69, 221, 236, 72, 21, 31, 80>>, pts: nil, dts: nil, metadata: %{duration: 20000000}}], [for_pad: :input]}, metadata: line=1373 pid=<0.4945.0> file=gen_server.erl domain=otp mfa=:gen_server.error_info/8 mb_prefix=<0.4709.0>/{:endpoint, "conversation_endpoint"}/:opus_payloader/:header_generator rtc_engine=4CE007265278E76558B69EA3354CCCF4 room_id=4CE007265278E76558B69EA3354CCCF4
19:50:34.066 [error] <0.4709.0>/{:endpoint, "conversation_endpoint"}/:opus_payloader/:header_generator Error occured in Membrane Element:
%UndefinedFunctionError{
module: Coerce.Implementations.Atom.Integer,
function: :coerce,
arity: 2,
reason: nil,
message: nil
}
Coerce.Implementations.Atom.Integer.coerce(nil, 48000)
(numbers 5.2.4) lib/numbers.ex:98: Numbers.mult/2
(membrane_rtp_plugin 0.24.1) lib/membrane/rtp/header_generator.ex:71: Membrane.RTP.HeaderGenerator.handle_buffer/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.15.6) lib/enum.ex:2510: Enum."-reduce/3-lists^foldl/2-0-"/3

...trimming error stack

Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<72, 11, 239, 4, 254, 218, 15, 39, 69, 221, 236, 72, 21, 31, 80>>, pts: nil, dts: nil, metadata: %{duration: 20000000}}], [for_pad: :input]}, metadata: line=1373 pid=<0.4945.0> file=gen_server.erl domain=otp mfa=:gen_server.error_info/8 mb_prefix=<0.4709.0>/{:endpoint, "conversation_endpoint"}/:opus_payloader/:header_generator rtc_engine=4CE007265278E76558B69EA3354CCCF4 room_id=4CE007265278E76558B69EA3354CCCF4
I looked at the code in lib/membrane/rtp/header_generator.ex:71 and it seems to assume that buffer.pts won't be nil. Though you can see in the last message it's definitely nil, which I think it what's causing the error to throw. More details in thread. Any advice would be appreciated. [1] https://hexdocs.pm/membrane_core/1.1.0/v1-0-0.html
10 Replies
TonyLikeSocks
TonyLikeSocksOP8mo ago
Part of this upgrade involved going from Ratio 2.0 to 3.0, and that required changing this code:
# Transform the delay to the sample count
# bytes =
# delay
# |> Time.as_seconds()
# |> Ratio.mult(16_000)
# |> Ratio.ceil()
# # Each sample is 2 bytes
# |> Ratio.mult(2)
# # But we don't want to take more bytes than we have in the buffer
# |> min(byte_size(buffer))
# Transform the delay to the sample count
# bytes =
# delay
# |> Time.as_seconds()
# |> Ratio.mult(16_000)
# |> Ratio.ceil()
# # Each sample is 2 bytes
# |> Ratio.mult(2)
# # But we don't want to take more bytes than we have in the buffer
# |> min(byte_size(buffer))
to this code:
bytes =
delay
|> Time.as_seconds()
|> Ratio.new()
|> Ratio.mult(Ratio.new(16_000))
|> Ratio.ceil()
|> Ratio.new()
# Each sample is 2 bytes
|> Ratio.mult(Ratio.new(2))
# But we don't want to take more bytes than we have in the buffer
|> min(byte_size(buffer))
bytes =
delay
|> Time.as_seconds()
|> Ratio.new()
|> Ratio.mult(Ratio.new(16_000))
|> Ratio.ceil()
|> Ratio.new()
# Each sample is 2 bytes
|> Ratio.mult(Ratio.new(2))
# But we don't want to take more bytes than we have in the buffer
|> min(byte_size(buffer))
Which we then use here:
<<payload::binary-size(bytes), buffer::binary>> = buffer
payload = %Buffer{payload: payload}
{[buffer: {:output, payload}], %{state | buffer: buffer}}
<<payload::binary-size(bytes), buffer::binary>> = buffer
payload = %Buffer{payload: payload}
{[buffer: {:output, payload}], %{state | buffer: buffer}}
I've also got this error related to the LiveAudioMixer
19:50:34.069 [error] <0.4709.0>/{:endpoint, "recorder"}/:mixer Error occured in Membrane Element:
%Protocol.UndefinedError{
protocol: String.Chars,
value: #Reference<0.461582325.3695968264.22001>,
description: ""
}
(elixir 1.15.6) lib/string/chars.ex:3: String.Chars.impl_for!/1
(elixir 1.15.6) lib/string/chars.ex:22: String.Chars.to_string/1
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer/live_queue.ex:57: Membrane.LiveAudioMixer.LiveQueue.remove_queue/2
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer.ex:221: Membrane.LiveAudioMixer.handle_end_of_stream/3
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(membrane_core 1.1.0) lib/membrane/core/element/event_controller.ex:131: Membrane.Core.Element.EventController.exec_handle_event/4
(membrane_core 1.1.0) lib/membrane/core/element.ex:243: Membrane.Core.Element.handle_info/2
(stdlib 5.1.1) gen_server.erl:1077: :gen_server.try_handle_info/3
(stdlib 5.1.1) gen_server.erl:1165: :gen_server.handle_msg/6
(stdlib 5.1.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3

, metadata: line=207 pid=<0.4731.0> file=lib/membrane/core/element.ex domain=elixir application=membrane_core mfa=Membrane.Core.Element.handle_info/2 mb_prefix=<0.4709.0>/{:endpoint, "recorder"}/:mixer rtc_engine=4CE007265278E76558B69EA3354CCCF4
19:50:34.069 [error] <0.4709.0>/{:endpoint, "recorder"}/:mixer Error occured in Membrane Element:
%Protocol.UndefinedError{
protocol: String.Chars,
value: #Reference<0.461582325.3695968264.22001>,
description: ""
}
(elixir 1.15.6) lib/string/chars.ex:3: String.Chars.impl_for!/1
(elixir 1.15.6) lib/string/chars.ex:22: String.Chars.to_string/1
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer/live_queue.ex:57: Membrane.LiveAudioMixer.LiveQueue.remove_queue/2
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer.ex:221: Membrane.LiveAudioMixer.handle_end_of_stream/3
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(membrane_core 1.1.0) lib/membrane/core/element/event_controller.ex:131: Membrane.Core.Element.EventController.exec_handle_event/4
(membrane_core 1.1.0) lib/membrane/core/element.ex:243: Membrane.Core.Element.handle_info/2
(stdlib 5.1.1) gen_server.erl:1077: :gen_server.try_handle_info/3
(stdlib 5.1.1) gen_server.erl:1165: :gen_server.handle_msg/6
(stdlib 5.1.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3

, metadata: line=207 pid=<0.4731.0> file=lib/membrane/core/element.ex domain=elixir application=membrane_core mfa=Membrane.Core.Element.handle_info/2 mb_prefix=<0.4709.0>/{:endpoint, "recorder"}/:mixer rtc_engine=4CE007265278E76558B69EA3354CCCF4
This one has me stumped as well.
Feliks
Feliks8mo ago
What version of :membrane_audio_mix_plugin and :membrane_rtp_plugin do you use?
TonyLikeSocks
TonyLikeSocksOP8mo ago
I think I’m on latest for everything but the file plugin. I don’t explicitly pull in :membrane_rtp_plugin Membrane {:membrane_audio_mix_plugin, "~> 0.16.1"}, {:membrane_fake_plugin, "~> 0.11"}, {:membrane_ffmpeg_swresample_plugin, "~> 0.20.2"}, {:membrane_file_plugin, "~> 0.16"}, {:membrane_mp3_mad_plugin, "~> 0.18.3"}, {:membrane_mp4_plugin, "~> 0.34.0"}, {:membrane_opus_plugin, "~> 0.20.2"}, {:membrane_raw_audio_format, "~> 0.12.0"}, {:membrane_rtc_engine, "~> 0.22.0"}, {:membrane_rtc_engine_webrtc, "~> 0.8.0"}, {:membrane_tee_plugin, "~> 0.12"}, membrane_rtp_plugin ~> 0.24.1, which is a child dependency of membrane_rtc_engine ~> 0.22.0 and of membrane_rtc_engine_webrtc ~> 0.8.0
Feliks
Feliks8mo ago
19:50:34.069 [error] <0.4709.0>/{:endpoint, "recorder"}/:mixer Error occured in Membrane Element: <- does this log come from your custom endpoint? Maybe you lack some parser before mixer 🤔 Parser added before mixer may solve your problem, as it will set timestamps in the stream
TonyLikeSocks
TonyLikeSocksOP8mo ago
This is my setup:
def handle_setup(_context, state) do
log_path = Application.fetch_env!(:smartvox, :log_path)
File.mkdir(log_path)

spec = [
child(:mixer, %Membrane.LiveAudioMixer{
stream_format: %Membrane.RawAudio{
channels: 1,
sample_rate: 16_000,
sample_format: :s16le
}
})
|> child(:encoder, %Membrane.Opus.Encoder{
application: :voip,
input_stream_format: %Membrane.RawAudio{
channels: 1,
sample_rate: 16_000,
sample_format: :s16le
}
})
|> child(:parser, Membrane.Opus.Parser)
|> child({:muxer, state.room_id}, Membrane.MP4.Muxer.ISOM)
|> child({:sink, state.room_id}, %Membrane.File.Sink{
location: "#{log_path}/#{state.room_id}.mp4"
})
]

{[spec: spec], state}
end

def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
track = state.tracks[track_id]

spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, Smartvox.Endpoints.Conversation.TrackRecevier)
|> child({:depayloader, track_id}, Track.get_depayloader(track))
|> child({:decoder, track_id}, %Membrane.Opus.Decoder{
sample_rate: 16_000,
})
|> via_in(:input)
|> get_child(:mixer)
]

{[spec: spec], state}
end
def handle_setup(_context, state) do
log_path = Application.fetch_env!(:smartvox, :log_path)
File.mkdir(log_path)

spec = [
child(:mixer, %Membrane.LiveAudioMixer{
stream_format: %Membrane.RawAudio{
channels: 1,
sample_rate: 16_000,
sample_format: :s16le
}
})
|> child(:encoder, %Membrane.Opus.Encoder{
application: :voip,
input_stream_format: %Membrane.RawAudio{
channels: 1,
sample_rate: 16_000,
sample_format: :s16le
}
})
|> child(:parser, Membrane.Opus.Parser)
|> child({:muxer, state.room_id}, Membrane.MP4.Muxer.ISOM)
|> child({:sink, state.room_id}, %Membrane.File.Sink{
location: "#{log_path}/#{state.room_id}.mp4"
})
]

{[spec: spec], state}
end

def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
track = state.tracks[track_id]

spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, Smartvox.Endpoints.Conversation.TrackRecevier)
|> child({:depayloader, track_id}, Track.get_depayloader(track))
|> child({:decoder, track_id}, %Membrane.Opus.Decoder{
sample_rate: 16_000,
})
|> via_in(:input)
|> get_child(:mixer)
]

{[spec: spec], state}
end
Would you add a parser after the opus decoder in the handle_pad_added callback?
Feliks
Feliks8mo ago
put |> child(:audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true}) between opus decoder and audio mixer And add {:membrane_raw_audio_parser_plugin, "~> 0.4.0"} to deps Let me know if it changed anything
TonyLikeSocks
TonyLikeSocksOP8mo ago
%Membrane.ParentError{
message: "Duplicated names in children specification: [:audio_parser]"
}
%Membrane.ParentError{
message: "Duplicated names in children specification: [:audio_parser]"
}
I added the new element as part of the spec defined in handle_pad_added. However, I get the error above.
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
track = state.tracks[track_id]

spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, Smartvox.Endpoints.Conversation.TrackRecevier)
|> child({:depayloader, track_id}, Track.get_depayloader(track))
|> child({:decoder, track_id}, %Membrane.Opus.Decoder{
sample_rate: 16_000
})
|> child(:audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> via_in(:input)
|> get_child(:mixer)
]

{[spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
track = state.tracks[track_id]

spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, Smartvox.Endpoints.Conversation.TrackRecevier)
|> child({:depayloader, track_id}, Track.get_depayloader(track))
|> child({:decoder, track_id}, %Membrane.Opus.Decoder{
sample_rate: 16_000
})
|> child(:audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> via_in(:input)
|> get_child(:mixer)
]

{[spec: spec], state}
end
Feliks
Feliks8mo ago
Change |> child(:audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true}) on |> child({:audio_parser, track_id}, %Membrane.RawAudioParser{overwrite_pts?: true})
TonyLikeSocks
TonyLikeSocksOP8mo ago
Thank you. That got us past that error, but now I see a new one:
20:05:56.275 [error] <0.1543.0>/{:endpoint, "recorder"}/:mixer Error occured in Membrane Element:
%Protocol.UndefinedError{
protocol: String.Chars,
value: #Reference<0.2482563602.4096262146.236672>,
description: ""
}
(elixir 1.15.6) lib/string/chars.ex:3: String.Chars.impl_for!/1
(elixir 1.15.6) lib/string/chars.ex:22: String.Chars.to_string/1
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer/live_queue.ex:57: Membrane.LiveAudioMixer.LiveQueue.remove_queue/2
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer.ex:221: Membrane.LiveAudioMixer.handle_end_of_stream/3
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(membrane_core 1.1.0) lib/membrane/core/element/event_controller.ex:131: Membrane.Core.Element.EventController.exec_handle_event/4
(membrane_core 1.1.0) lib/membrane/core/element.ex:243: Membrane.Core.Element.handle_info/2
(stdlib 5.1.1) gen_server.erl:1077: :gen_server.try_handle_info/3
(stdlib 5.1.1) gen_server.erl:1165: :gen_server.handle_msg/6
(stdlib 5.1.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
20:05:56.275 [error] <0.1543.0>/{:endpoint, "recorder"}/:mixer Error occured in Membrane Element:
%Protocol.UndefinedError{
protocol: String.Chars,
value: #Reference<0.2482563602.4096262146.236672>,
description: ""
}
(elixir 1.15.6) lib/string/chars.ex:3: String.Chars.impl_for!/1
(elixir 1.15.6) lib/string/chars.ex:22: String.Chars.to_string/1
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer/live_queue.ex:57: Membrane.LiveAudioMixer.LiveQueue.remove_queue/2
(membrane_audio_mix_plugin 0.16.1) lib/membrane_live_audio_mixer.ex:221: Membrane.LiveAudioMixer.handle_end_of_stream/3
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.1.0) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(membrane_core 1.1.0) lib/membrane/core/element/event_controller.ex:131: Membrane.Core.Element.EventController.exec_handle_event/4
(membrane_core 1.1.0) lib/membrane/core/element.ex:243: Membrane.Core.Element.handle_info/2
(stdlib 5.1.1) gen_server.erl:1077: :gen_server.try_handle_info/3
(stdlib 5.1.1) gen_server.erl:1165: :gen_server.handle_msg/6
(stdlib 5.1.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Feliks
Feliks8mo ago
Change ... |> via_in(:input) |> get_child(:mixer) in spec on ... |> via_in(Pad.ref(:input, track_id)) |> get_child(:mixer) or eventually ... |> via_in(Pad.ref(:input, inspect(track_id))) |> get_child(:mixer)

Did you find this page helpful?