How to pass some client side parameters to an RTMP pipeline

Hey team I have an RTMP pipeline (which I simplified for the purpose of the question):
def handle_init(ctx, socket: socket) do
Logger.info("Starting RTMP pipeline")

structure = [
child(:source, %Membrane.RTMP.Source{
socket: socket,
validator: Membrane.RTMP.DefaultMessageValidator
})
|> child(:demuxer, Membrane.FLV.Demuxer)
|> via_out(Pad.ref(:audio, 0))
# we use a fake sink to terminate the pipeline
|> child(:fake_sink, Membrane.Fake.Sink.Buffers)
]

{[spec: structure, playback: :playing], %{}}
end
def handle_init(ctx, socket: socket) do
Logger.info("Starting RTMP pipeline")

structure = [
child(:source, %Membrane.RTMP.Source{
socket: socket,
validator: Membrane.RTMP.DefaultMessageValidator
})
|> child(:demuxer, Membrane.FLV.Demuxer)
|> via_out(Pad.ref(:audio, 0))
# we use a fake sink to terminate the pipeline
|> child(:fake_sink, Membrane.Fake.Sink.Buffers)
]

{[spec: structure, playback: :playing], %{}}
end
Which is served by an TCP server:
{Membrane.RTMP.Source.TcpServer, rtmp_server_options()},
{Membrane.RTMP.Source.TcpServer, rtmp_server_options()},
defp rtmp_server_options do
%Membrane.RTMP.Source.TcpServer{
port: rtmp_port(),
listen_options: [
:binary,
packet: :raw,
active: false,
ip: rtmp_ip()
],
socket_handler: fn socket ->
# On new connection a pipeline is started
{:ok, _supervisor, pipeline} = MyPipeline.Pipeline.start_link(socket: socket)
{:ok, pipeline}
end
}
end
defp rtmp_server_options do
%Membrane.RTMP.Source.TcpServer{
port: rtmp_port(),
listen_options: [
:binary,
packet: :raw,
active: false,
ip: rtmp_ip()
],
socket_handler: fn socket ->
# On new connection a pipeline is started
{:ok, _supervisor, pipeline} = MyPipeline.Pipeline.start_link(socket: socket)
{:ok, pipeline}
end
}
end
This works great, I can initiate an rtmp session using this kind of url for example: rtmp://localhost:5000 Now I'm trying to use url like this: rtmp://localhost:5000?rtmp_id=xxxx or even something like this: rtmp://localhost:5000/xxxx/ So within my pipeline I can get this xxxx and use if for different things (maybe you can think of a filename which could be anice usecase) So far I don't manage to find how I can get this xxxx within my pipeline, happy to know if there is anotherway to do that ๐Ÿ˜„ ! Thanks
2 Replies
enzoqtvf
enzoqtvfOPโ€ข2y ago
Actually if that can help people so far I found I can see these informations in the rtmp connect message like this:
%Membrane.RTMP.Messages.Connect{
app: "call_control_id=enzoteststs&test=22",
type: "nonprivate",
supports_go_away: false,
flash_version: "FMLE/3.0 (compatible; Lavf59.27.100)",
swf_url: "rtmp://localhost:5002/call_control_id=enzoteststs&test=22",
tc_url: "rtmp://localhost:5002/call_control_id=enzoteststs&test=22",
tx_id: 1.0
}
%Membrane.RTMP.Messages.Connect{
app: "call_control_id=enzoteststs&test=22",
type: "nonprivate",
supports_go_away: false,
flash_version: "FMLE/3.0 (compatible; Lavf59.27.100)",
swf_url: "rtmp://localhost:5002/call_control_id=enzoteststs&test=22",
tc_url: "rtmp://localhost:5002/call_control_id=enzoteststs&test=22",
tx_id: 1.0
}
Wondering if modifying the message_handler inside the RTMP plugin library like this would make sense:
defp do_handle_client_message(%Messages.Connect{app: app} = connect, _header, state) do
chunk_size = state.message_parser.chunk_size

[
%Messages.WindowAcknowledgement{size: @windows_acknowledgment_size},
%Messages.SetPeerBandwidth{size: @peer_bandwidth_size},
# stream begin type
%Messages.UserControl{event_type: 0x00, data: <<0, 0, 0, 0>>},
# by default the ffmpeg server uses 128 chunk size
%Messages.SetChunkSize{chunk_size: chunk_size}
]
|> Enum.each(&send_rtmp_payload(&1, state.socket, chunk_size))

{[tx_id], message_parser} = MessageParser.generate_tx_ids(state.message_parser, 1)

tx_id
|> Responses.connection_success()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

Responses.on_bw_done()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

{:cont,
%{
state
| message_parser: message_parser,
actions: state.actions ++ [{:notify_parent, {:rtmp_app, app}}]
}}
end
defp do_handle_client_message(%Messages.Connect{app: app} = connect, _header, state) do
chunk_size = state.message_parser.chunk_size

[
%Messages.WindowAcknowledgement{size: @windows_acknowledgment_size},
%Messages.SetPeerBandwidth{size: @peer_bandwidth_size},
# stream begin type
%Messages.UserControl{event_type: 0x00, data: <<0, 0, 0, 0>>},
# by default the ffmpeg server uses 128 chunk size
%Messages.SetChunkSize{chunk_size: chunk_size}
]
|> Enum.each(&send_rtmp_payload(&1, state.socket, chunk_size))

{[tx_id], message_parser} = MessageParser.generate_tx_ids(state.message_parser, 1)

tx_id
|> Responses.connection_success()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

Responses.on_bw_done()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

{:cont,
%{
state
| message_parser: message_parser,
actions: state.actions ++ [{:notify_parent, {:rtmp_app, app}}]
}}
end
That way I can get this information from my pipeline (note the addition of the actions)
enzoqtvf
enzoqtvfOPโ€ข2y ago
I made a PR: https://github.com/membraneframework/membrane_rtmp_plugin/pull/57 it might not be the best way to do it, but it solved my issue in my case ๐Ÿ™‚
GitHub
Send to parent the RTMP connect message by enzoqtvf ยท Pull Request ...
Hey team, this is not really a finished PR but something that I would like to have from this library. In this PR I'm modifying the message_handler to send the RTMP.Message.Connect to the parent...

Did you find this page helpful?