Kevin
Kevin
CDCloudflare Developers
Created by Kevin on 3/24/2025 in #workers-help
"Error: Network connection lost." on SSE subrequest idling for 100 seconds between events
Cloudflare worker when deployed will throw an error: "network connection lost" when reading from a sub request stream with fetch() if therres more than 100 seconds between server sent events. this behavior isn't documented anywhere and is quite frustrating. i was able to reproduce this to happen every time by: 1. create a simple nodejs service that sends SSE chunks back. wait 2 minutes between chunks. 2. run the nodejs service locally and use ngrok to get a url forwarded 3. make and deploy a cloudflare worker that sends a fetch request to the ngrok url, and read from the streaming response while reading from the streaming response, and error gets thrown. here is the cloudflare worker:
export default async function fetch(request, env, ctx) {
try {
// Check if the path is v1/benchmark
const url = new URL(request.url);
if (url.pathname === '/v1/benchmark') {
// Benchmark endpoint URL
const benchmarkUrl = "https://.<your ngrok url>.ngrok-free.app";

console.log(`Making benchmark request to: ${benchmarkUrl}`);

// Make a streaming request to the benchmark server
const benchResponse = await fetch(benchmarkUrl, {
method: 'GET',
headers: {
'Accept': 'text/event-stream'
}
});

// Check if the response is valid
if (!benchResponse.ok) {
console.error(`Benchmark server returned status: ${benchResponse.status}`);
return new Response(JSON.stringify({
error: `Benchmark server error: ${benchResponse.status}`
}), {
status: benchResponse.status,
headers: { 'Content-Type': 'application/json' }
});
}

// Create a TransformStream to handle and forward the streaming response
const { readable, writable } = new TransformStream();

// Start streaming the response data
(async () => {
const reader = benchResponse.body.getReader();
const writer = writable.getWriter();
const decoder = new TextDecoder();

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

// Decode and log the chunk
const chunk = decoder.decode(value, { stream: true });
console.log(`Benchmark data chunk: ${chunk}`);

// Forward the chunk to the client
await writer.write(value);
}

console.log("Benchmark streaming completed");
await writer.close();
} catch (error) {
console.error('Error in benchmark stream:', error);
console.error('Error details:', {
...error,
});
await writer.abort(error);
}
})();

// Return the readable stream to the client
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
} else {
return new Response(JSON.stringify({ error: 'Not found' }), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
} catch (error) {
console.error('Fetch error:', error);
return new Response(JSON.stringify({ error: error.message }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
export default async function fetch(request, env, ctx) {
try {
// Check if the path is v1/benchmark
const url = new URL(request.url);
if (url.pathname === '/v1/benchmark') {
// Benchmark endpoint URL
const benchmarkUrl = "https://.<your ngrok url>.ngrok-free.app";

console.log(`Making benchmark request to: ${benchmarkUrl}`);

// Make a streaming request to the benchmark server
const benchResponse = await fetch(benchmarkUrl, {
method: 'GET',
headers: {
'Accept': 'text/event-stream'
}
});

// Check if the response is valid
if (!benchResponse.ok) {
console.error(`Benchmark server returned status: ${benchResponse.status}`);
return new Response(JSON.stringify({
error: `Benchmark server error: ${benchResponse.status}`
}), {
status: benchResponse.status,
headers: { 'Content-Type': 'application/json' }
});
}

// Create a TransformStream to handle and forward the streaming response
const { readable, writable } = new TransformStream();

// Start streaming the response data
(async () => {
const reader = benchResponse.body.getReader();
const writer = writable.getWriter();
const decoder = new TextDecoder();

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

// Decode and log the chunk
const chunk = decoder.decode(value, { stream: true });
console.log(`Benchmark data chunk: ${chunk}`);

// Forward the chunk to the client
await writer.write(value);
}

console.log("Benchmark streaming completed");
await writer.close();
} catch (error) {
console.error('Error in benchmark stream:', error);
console.error('Error details:', {
...error,
});
await writer.abort(error);
}
})();

// Return the readable stream to the client
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
} else {
return new Response(JSON.stringify({ error: 'Not found' }), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
} catch (error) {
console.error('Fetch error:', error);
return new Response(JSON.stringify({ error: error.message }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
3 replies