real time video stream that gets served to multiple users

Hi all, this is at least what I found to be a very unique issue or problem I am facing and is a react client side issue and need and I'll do my best to explain the issue but for work we work with other companies and integrate rstp/http cameras into our software, and we need to be able to serve these cameras video feed onto our webapp. I am using ffmpeg to convert the rtsp/http stream to mp4 and I am taking that output and piping it to a rabbit mq and publishing the messages using pub/sub. My NextJs API Route Handler is the subscriber of the RMQ message broker which consumes the messages from rmq and streams that to my client component in the browser. It also could be my lack of understanding of how NextJs API Routes work too The issue I am having is first user to start a stream gets video, but every other user after that is able to read the video chunks, but is unable to get any sort of video data back.
6 Replies
NinjaBunny
NinjaBunnyOP•7d ago
NinjaBunny
NinjaBunnyOP•7d ago
// stream/[id]/route.ts
let connection: Connection | null = null;
let channel: Channel | null = null;

export async function GET(
req: NextRequest,
{ params }: { params: { id: string } },
) {
// gets or refreshes user tokens or throws response error
const tokens = await getTokens(req);

if (tokens instanceof NextResponse) return tokens;

channel = await getChannel();

const exchange = `stream.${params.id}`;
const session = nanoid();
const queueName = `${exchange}.${session}`;

try {
await channel.assertExchange(exchange, "fanout", { durable: false });

const { queue } = await channel.assertQueue(queueName, {
exclusive: true,
autoDelete: true,
});
await channel.bindQueue(queue, exchange, "");

const stream = new ReadableStream({
start(controller) {
channel?.consume(queue, (message) => {
if (message) {
controller.enqueue(message.content);
}
});
},
async cancel() {
await channel?.cancel(queue);

await cleanup();
},
});

const headers = new Headers();

headers.set("Content-Type", "text/event-stream");
headers.set("Transfer-Encoding", "chunked");

const response = new NextResponse(stream, {
headers,
});

if (typeof tokens.data !== "string") {
response.cookies.set("access_token", tokens.data.at.value, {
...DEFAULT_COOKIE_OPTIONS,
expires: tokens.data.at.expires,
});

response.cookies.set("refresh_token", tokens.data.rt.value, {
...DEFAULT_COOKIE_OPTIONS,
expires: tokens.data.rt.expires,
});
}

return response;
} catch (error) {
console.error("Error in stream setup", error);

return NextResponse.json(
{ success: false, error: "Failed to start stream" } as const,
{ status: 500 },
);
}
}
// stream/[id]/route.ts
let connection: Connection | null = null;
let channel: Channel | null = null;

export async function GET(
req: NextRequest,
{ params }: { params: { id: string } },
) {
// gets or refreshes user tokens or throws response error
const tokens = await getTokens(req);

if (tokens instanceof NextResponse) return tokens;

channel = await getChannel();

const exchange = `stream.${params.id}`;
const session = nanoid();
const queueName = `${exchange}.${session}`;

try {
await channel.assertExchange(exchange, "fanout", { durable: false });

const { queue } = await channel.assertQueue(queueName, {
exclusive: true,
autoDelete: true,
});
await channel.bindQueue(queue, exchange, "");

const stream = new ReadableStream({
start(controller) {
channel?.consume(queue, (message) => {
if (message) {
controller.enqueue(message.content);
}
});
},
async cancel() {
await channel?.cancel(queue);

await cleanup();
},
});

const headers = new Headers();

headers.set("Content-Type", "text/event-stream");
headers.set("Transfer-Encoding", "chunked");

const response = new NextResponse(stream, {
headers,
});

if (typeof tokens.data !== "string") {
response.cookies.set("access_token", tokens.data.at.value, {
...DEFAULT_COOKIE_OPTIONS,
expires: tokens.data.at.expires,
});

response.cookies.set("refresh_token", tokens.data.rt.value, {
...DEFAULT_COOKIE_OPTIONS,
expires: tokens.data.rt.expires,
});
}

return response;
} catch (error) {
console.error("Error in stream setup", error);

return NextResponse.json(
{ success: false, error: "Failed to start stream" } as const,
{ status: 500 },
);
}
}
I don't really think this is a backend issue at all and is a very niche issue on the browser dealing with the media source and handling video chunks properly
OreQr
OreQr•7d ago
Hi @NinjaBunny, why don't you just use hls streaming? You could skip whole rabbit mq, pipeline stuff. And when you need to use it you can just send video/mp4 instead of text/event-stream
NinjaBunny
NinjaBunnyOP•7d ago
actually taht's exaclty what I did today at work because I found out earlier to day that I am unable to use mp4 entirely due to the first chunk or 2 having data regarding the actual video stream I tried HLS in the past, but it wasn't working because I was not interacting with the browser lol and I forgot I was trying to have it auto play when I was using HLS the first time
Madvacska
Madvacska•7d ago
Maybe using something like VideoJs on the client side would help you play the HLS stream in the browser easily. I am not saying that is the best option out there but it sure works 😅 One of my old apps is using it at our company. It would also help with autoplay
adnan
adnan•2d ago
the last co i was w/ was using wowza server to stream videos, i think it uses webrtc in the background it was pretty solid.

Did you find this page helpful?