Testing a Membrane Bin used in a WebRTC Engine Endpoint

I'm trying to test a membrane bin that I use in WebRTC engine endpoint. Though I'm having trouble getting my test setup properly. I was trying to approach is this way: 1. Start my Membrane.Testing.Pipeline with a simple spec, just the conversation bin 2. Send a {:new_tracks} event to the bin, simulating what the Membrane.WebRTC.Engine does when a new bin is assed as an endpoint 3. Then update the spec to connect the Membrane.Testing.DynamicSource and Membrane.Testing.Sink But I'm never tripping the handle_pad_added callback in my bin, which leads me to believe I'm doing something wrong. I'm using the Membrane.Testing.Pipeline so I can access the Membrane.Testing.Assertions and assert that my conversation bin is emitting the right events, etc. Is this the right approach for testing a custom bin used in the webrtc engine? Or is there a better / different approach? I checked out the integration tests in the repo, but none seemed to handle this exact case.
conversation_bin = %Custom.Bin{options: options}
options = [
spec: [
# Use tuple name
child({:endpoint, endpoint_id}, conversation_bin)
],
name: :conversation_pipeline
]

pipeline = Pipeline.start_link_supervised!(options)
Pipeline.notify_child(pipeline, {:endpoint, endpoint_id}, {:new_tracks, [track]})
updated_spec = [
child(:source, %DynamicSource{
output: {0, generator},
stream_format: @audio_format
})
|> via_in(pad)
|> get_child({:endpoint, endpoint_id}),
get_child({:endpoint, endpoint_id})
|> via_in(:input)
|> child(:sink, %Sink{})
]

# Execute the spec update to link the pads
Pipeline.execute_actions(pipeline, spec: updated_spec)
conversation_bin = %Custom.Bin{options: options}
options = [
spec: [
# Use tuple name
child({:endpoint, endpoint_id}, conversation_bin)
],
name: :conversation_pipeline
]

pipeline = Pipeline.start_link_supervised!(options)
Pipeline.notify_child(pipeline, {:endpoint, endpoint_id}, {:new_tracks, [track]})
updated_spec = [
child(:source, %DynamicSource{
output: {0, generator},
stream_format: @audio_format
})
|> via_in(pad)
|> get_child({:endpoint, endpoint_id}),
get_child({:endpoint, endpoint_id})
|> via_in(:input)
|> child(:sink, %Sink{})
]

# Execute the spec update to link the pads
Pipeline.execute_actions(pipeline, spec: updated_spec)
8 Replies
Feliks
Feliks3d ago
Hm, the code that you posted seems to look good. Could you share assertions that fail as well?
TonyLikeSocks
TonyLikeSocksOP3d ago
This is the assertion that's failing
assert_pipeline_notified(
pipeline,
{:endpoint, endpoint_id},
{:track_ready, ^track_id, :high, :opus},
5_000
)
assert_pipeline_notified(
pipeline,
{:endpoint, endpoint_id},
{:track_ready, ^track_id, :high, :opus},
5_000
)
I'm expecting this callback to be called:
# A pad for the track that we subscribed to was connected
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end
# A pad for the track that we subscribed to was connected
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end
I've even got a handle_pad_added without the guard
@impl true
def handle_pad_added(Pad.ref(:input, track_id), ctx, state) do
IO.puts("Unhandled pad added. #{inspect(track_id)}")
IO.puts("State: #{inspect(state)}")
IO.puts("Ctx: #{inspect(ctx)}")
{[], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id), ctx, state) do
IO.puts("Unhandled pad added. #{inspect(track_id)}")
IO.puts("State: #{inspect(state)}")
IO.puts("Ctx: #{inspect(ctx)}")
{[], state}
end
And it's never called either. I assumed the notify_parent would trigger a notification to the test pipeline and I could assert on it.
Feliks
Feliks3d ago
Do you implement handle_setup in any child that is spawned in your Pipeline?
TonyLikeSocks
TonyLikeSocksOP3d ago
Yes. I'm testing more, and the event returned from my handle_init I can assert on. But an event fired from this callback:
@impl true
def handle_parent_notification(
{:new_tracks, [%Track{type: :audio} = track]},
ctx,
%{track: nil} = state
) do
{:endpoint, endpoint_id} = ctx.name
# subscribe to track
IO.puts("Subscribing to track #{inspect(track.id)}")

:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
out_track = out_track_spec(track, ctx)

# Publish new track
actions = [notify_parent: {:publish, {:new_tracks, [out_track]}}]
state = %{state | out_track: out_track, track: track}

{actions, state}
end
@impl true
def handle_parent_notification(
{:new_tracks, [%Track{type: :audio} = track]},
ctx,
%{track: nil} = state
) do
{:endpoint, endpoint_id} = ctx.name
# subscribe to track
IO.puts("Subscribing to track #{inspect(track.id)}")

:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
out_track = out_track_spec(track, ctx)

# Publish new track
actions = [notify_parent: {:publish, {:new_tracks, [out_track]}}]
state = %{state | out_track: out_track, track: track}

{actions, state}
end
I can't assert on. Though I can see the IO.puts message in my logs, so I know the callback is being fired. Though the children for this bin aren't spawned until we handle this callback:
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
...stuff
end
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
...stuff
end
And the assertions that are failing right now are prior to that in the lifecycle.
Feliks
Feliks2d ago
Give me please the implementation of handle_setup that you implement. Maybe the execution of it doesn't end? Please check as well if handle_playing is executed in children of your pipeline.
TonyLikeSocks
TonyLikeSocksOP2d ago
I checked and the handle_setup isn't being called. It's one of the child elements of the bin. My hunch is that I'm setting the spec up wrong in my test file. My conversation bin doesn't actually return the child spec until we get the handle_pad_added callbacks. Here's the relevant code from the conversation.ex bin that I'm trying to test:
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end

@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
...omitted
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end

@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
...omitted
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
Since neither of those is tripped, I think I'm not setting my testing spec up correctly, or the Testing pipeline isn't correctly notifying the element that new pads have been added.
Feliks
Feliks2d ago
I see that: 1. You don't link any bin pad in handle_pad_added for :input pad 2. You link :input pad in handle_pad_added for :output pad But where do you link :output pad? I seems it is skipped
TonyLikeSocks
TonyLikeSocksOP3h ago
We setup the whole spec in the handle_pad_added for the output pad. I omitted it for brevity earlier. I
@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
|> child(:track_receiver, __MODULE__.TrackRecevier)
|> children...omitted for discord message limits
|> via_in(Pad.ref(:input, {state.out_track.id, :high}))
|> child(:track_sender, %TrackSender{
track: state.out_track,
variant_bitrates: %{high: 50_000}
})
|> via_out(Pad.ref(:output, {state.out_track.id, :high}))
|> bin_output(pad),
branch_spec(state),
branch_spec(state)
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
|> child(:track_receiver, __MODULE__.TrackRecevier)
|> children...omitted for discord message limits
|> via_in(Pad.ref(:input, {state.out_track.id, :high}))
|> child(:track_sender, %TrackSender{
track: state.out_track,
variant_bitrates: %{high: 50_000}
})
|> via_out(Pad.ref(:output, {state.out_track.id, :high}))
|> bin_output(pad),
branch_spec(state),
branch_spec(state)
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
I think I realized at least part of the problem. My element is silently dying. When I send send in the :new_tracks notification, I'm also trying to subscribe to the track published by the engine. I've got this line:
:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
But the state.rtc_engine is the pid of my testing process, not an actual running RTC engine. So that call silently fails, and nothing else is executed, which is why my handle_pad_added calls weren't firing. Adding this to my test, unblocked me. I'm successfully setting up the pipeline and mocking the RTC Engine pieces.
receive do
{:subscribe, {caller_pid, ref}, ^endpoint_id, received_track_id, _opts} ->
# Verify the track ID if needed
assert received_track_id == track.id,
"Track ID in subscribe message doesn't match expected track ID"
send(caller_pid, {ref, :ok})
after
5000 -> flunk("Did not receive :subscribe message from Conversation bin within timeout")
end
receive do
{:subscribe, {caller_pid, ref}, ^endpoint_id, received_track_id, _opts} ->
# Verify the track ID if needed
assert received_track_id == track.id,
"Track ID in subscribe message doesn't match expected track ID"
send(caller_pid, {ref, :ok})
after
5000 -> flunk("Did not receive :subscribe message from Conversation bin within timeout")
end
Thanks @Feliks ! FWIW - i think the failure is silent because my test assertions were using the default timeout of 2_000, but that Engine.subscribe call has a timeout of 5_000. So I never tripped the timeout and saw the error message. @Feliks So now my issue is that the first element in my bin is Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver which has it's input pad flow_control: :push ; but the output pad of the Membrane.Testing.DynamicSource that I'm using has an output pad flow_control: :manual, which is incompatible. I perused the Engine code, and I assume it's doing something to mediate between the two flow controls, though looking at the code in membrane webrtc engine ( engine.ex tee.ex and filter_tee.ex) it wasn't obvious to me how that was being done. What do you advise? Should I build a dummy element in my testing pipeline that just mediates between the Testing.Source element and one that expects a push? Or I could make a copy of TrackReceiver that has the pad definitions set to :auto and just roll with it. Though I don't fully understand the implications here, so I'm a little worried about doing that and producing some subtle bug in the future. I appreciate the advice.

Did you find this page helpful?