Cloudflare Worker is waiting for the

Cloudflare Worker is waiting for the whole response before streaming the output to the client, which defeats the entire point of streaming. I don't know what I am doing wrong.
router.post("/v1/streaming", async (request, { OPENAI_API }) => {
let { readable, writable } = new TransformStream();
const client = new ChatOpenAI({
openAIApiKey: OPENAI_API,
});

const stream = await client.stream(
"Write a 2 sentence poem about a sunset."
);
let writer = writable.getWriter();
const textEncoder = new TextEncoder();

for await (const part of stream) {
if (part.content.length) {
// Assuming part.content is a string containing the text
const words = part.content.split(" "); // Split the text into words
for (const word of words) {
const sseFormattedData = `data: ${word}\n\n`; // Format each word as SSE
await writer.write(textEncoder.encode(sseFormattedData));
console.log("🚀 - forawait - sseFormattedData:", sseFormattedData);
await new Promise((r) => setTimeout(r, 1000)); // Wait for 1 second before sending the next word
}
}
}

writer.close();

return new Response(readable, {
// headers: {
// "Content-Type": "text/event-stream",
// "Cache-Control": "no-cache",
// Connection: "keep-alive",
// },
});
})
router.post("/v1/streaming", async (request, { OPENAI_API }) => {
let { readable, writable } = new TransformStream();
const client = new ChatOpenAI({
openAIApiKey: OPENAI_API,
});

const stream = await client.stream(
"Write a 2 sentence poem about a sunset."
);
let writer = writable.getWriter();
const textEncoder = new TextEncoder();

for await (const part of stream) {
if (part.content.length) {
// Assuming part.content is a string containing the text
const words = part.content.split(" "); // Split the text into words
for (const word of words) {
const sseFormattedData = `data: ${word}\n\n`; // Format each word as SSE
await writer.write(textEncoder.encode(sseFormattedData));
console.log("🚀 - forawait - sseFormattedData:", sseFormattedData);
await new Promise((r) => setTimeout(r, 1000)); // Wait for 1 second before sending the next word
}
}
}

writer.close();

return new Response(readable, {
// headers: {
// "Content-Type": "text/event-stream",
// "Cache-Control": "no-cache",
// Connection: "keep-alive",
// },
});
})
1 Reply
anurag
anuragOP•12mo ago
We're using ittyr-router let me see how to do wait until in it .waitUntil isn't available in this router but I'm passing it as the third param in the handleRequest
router.post("/v1/streaming", async (request, { OPENAI_API }, context) => {
console.log("🚀 - .post - context:", context.waitUntil); // undefined
})

export const handleRequest = async (request: Request, env, context) => {
console.log("🚀 - handleRequest - context:", context.waitUntil); // available here
const posthog = new Posthog(env.POSTHOG_KEY, env.NODE_ENV);
const start = Date.now();
const response = await router.handle(request, env, {
...context,
posthog,
});

const duration = Date.now() - start;
console.log(
`API ${request.method} ${request.url} took ${duration} ms - ${response.status}`
);

// if (origin && validCorsURLs.some((route) => origin?.includes(route))) {
response.headers.set("Access-Control-Allow-Origin", "*");
response.headers.set("Access-Control-Allow-Methods", "GET, OPTIONS");
response.headers.set("Access-Control-Allow-Headers", "Content-Type");
// }

return response;
}
router.post("/v1/streaming", async (request, { OPENAI_API }, context) => {
console.log("🚀 - .post - context:", context.waitUntil); // undefined
})

export const handleRequest = async (request: Request, env, context) => {
console.log("🚀 - handleRequest - context:", context.waitUntil); // available here
const posthog = new Posthog(env.POSTHOG_KEY, env.NODE_ENV);
const start = Date.now();
const response = await router.handle(request, env, {
...context,
posthog,
});

const duration = Date.now() - start;
console.log(
`API ${request.method} ${request.url} took ${duration} ms - ${response.status}`
);

// if (origin && validCorsURLs.some((route) => origin?.includes(route))) {
response.headers.set("Access-Control-Allow-Origin", "*");
response.headers.set("Access-Control-Allow-Methods", "GET, OPTIONS");
response.headers.set("Access-Control-Allow-Headers", "Content-Type");
// }

return response;
}
I removed the context overide code and now I have context.waitUntiil the router but I don't understand why waitUntil would work in this case. Because I'm not sending any response at the end of router v1/streaming it's going into the global catch-all route of 404 so the client says 404 but I'm getting the streamed data in the console not the frontend.
.post("/v1/streaming", async (request, { OPENAI_API }, context) => {
console.log("🚀 - .post - context:", context);
console.log("🚀 - .post - context:", context.waitUntil);
let { readable, writable } = new TransformStream();
const client = new ChatOpenAI({
openAIApiKey: OPENAI_API,
});

const stream = await client.stream(
"Write a 2 sentence poem about a sunset."
);
let writer = writable.getWriter();
const textEncoder = new TextEncoder();

context.waitUntil(
new Promise(async (resolve) => {
for await (const part of stream) {
console.log("🚀 - forawait - part:", part);
if (part.content.length) {
// Assuming part.content is a string containing the text
const words = part.content.split(" "); // Split the text into words
for (const word of words) {
const sseFormattedData = `data: ${word}\n\n`; // Format each word as SSE
await writer.write(textEncoder.encode(sseFormattedData));
console.log(
"🚀 - forawait - sseFormattedData:",
sseFormattedData
);
await new Promise((r) => setTimeout(r, 1000)); // Wait for 1 second before sending the next word
}
}
}

writer.close();

resolve("ok");
})
);

})
.post("/v1/streaming", async (request, { OPENAI_API }, context) => {
console.log("🚀 - .post - context:", context);
console.log("🚀 - .post - context:", context.waitUntil);
let { readable, writable } = new TransformStream();
const client = new ChatOpenAI({
openAIApiKey: OPENAI_API,
});

const stream = await client.stream(
"Write a 2 sentence poem about a sunset."
);
let writer = writable.getWriter();
const textEncoder = new TextEncoder();

context.waitUntil(
new Promise(async (resolve) => {
for await (const part of stream) {
console.log("🚀 - forawait - part:", part);
if (part.content.length) {
// Assuming part.content is a string containing the text
const words = part.content.split(" "); // Split the text into words
for (const word of words) {
const sseFormattedData = `data: ${word}\n\n`; // Format each word as SSE
await writer.write(textEncoder.encode(sseFormattedData));
console.log(
"🚀 - forawait - sseFormattedData:",
sseFormattedData
);
await new Promise((r) => setTimeout(r, 1000)); // Wait for 1 second before sending the next word
}
}
}

writer.close();

resolve("ok");
})
);

})

Did you find this page helpful?