Serialized RPC arguments or return values are limited to 1MiB

When trying to move to a Service Binding oriented architecture between workers, I bumped into the following error: Serialized RPC arguments or return values are limited to 1MiB This is happening when trying to transmit a large dataset through a service binding to the worker that handles DB operations. The array of data (trimmed as much as possible to an array of arrays) is just over 1 MB in size. It feels strange that I may need to break this data up into multiple RPC calls to get past this limitation when doing it over HTTP worked just fine. 1. Is it possible to up this limit for a worker? 2. Do I have other options? Thanks!
9 Replies
DaniFoldi
DaniFoldiā€¢4mo ago
Hi, I think you should be able to pass around larger amounts of data with streams - although I haven't specifically tested the size limit, they should work with any size, even larger than the worker memory size (unbuffered of course, which doesn't count here). So if instead of using an array as the argument, you pass in a readablestream, you should be able to read from the service binding what you pipe in from your caller and vice versa.
Murder Chicken
Murder ChickenOPā€¢4mo ago
@Purple Blob thanks for the input. Are you aware of any code examples of how to do with with RPC service bindings? Also, the documentation notes that the Stream API is only available within a fetch event. Do I have this or other options within a scheduled event? For anyone who stumbles into this thread... I managed to figure this out (thanks Cursor IDE). Worker making the RPC call with a large payload...
const data = [[data], [data], [data], ...];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
data.forEach(chunk => {
const bytes = encoder.encode(JSON.stringify(chunk) + '\n');
controller.enqueue(bytes);
});
controller.close();
}
});

// Send to other worker RPC method through Service Binding...
await env.SERVICE_BINDING.rpcMethod(stream);
const data = [[data], [data], [data], ...];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
data.forEach(chunk => {
const bytes = encoder.encode(JSON.stringify(chunk) + '\n');
controller.enqueue(bytes);
});
controller.close();
}
});

// Send to other worker RPC method through Service Binding...
await env.SERVICE_BINDING.rpcMethod(stream);
Consumer worker with RPC method...
async rpcMethod(stream) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = '';
let data = [];

try {
while (true) {
let { done, value } = await reader.read().catch(error => {
console.warn('Stream read error:', error);
return { done: true, value: undefined };
});

if (done) {
if (buffer) {
try {
data.push(JSON.parse(buffer));
} catch (error) {
console.warn('Error parsing remaining buffer:', error);
}
}
break;
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();

for (const line of lines) {
if (line) {
try {
data.push(JSON.parse(line));
} catch (error) {
console.warn('Error parsing JSON:', error, 'Line:', line);
}
}
}
}
} catch (error) {
console.error('Stream processing error:', error);
} finally {
reader.releaseLock();
}

// Do whatever with the data
}
async rpcMethod(stream) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = '';
let data = [];

try {
while (true) {
let { done, value } = await reader.read().catch(error => {
console.warn('Stream read error:', error);
return { done: true, value: undefined };
});

if (done) {
if (buffer) {
try {
data.push(JSON.parse(buffer));
} catch (error) {
console.warn('Error parsing remaining buffer:', error);
}
}
break;
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();

for (const line of lines) {
if (line) {
try {
data.push(JSON.parse(line));
} catch (error) {
console.warn('Error parsing JSON:', error, 'Line:', line);
}
}
}
}
} catch (error) {
console.error('Stream processing error:', error);
} finally {
reader.releaseLock();
}

// Do whatever with the data
}
Warren
Warrenā€¢2mo ago
hey I just want to revive this discussion on a related note. instead of streaming, I'm trying to modify the work I'm doing within the Worker to ensure that the return value is smaller than the 1MiB limit before returning it.
export function getApproximateSizeInBytes(obj: unknown) {
return new TextEncoder().encode(JSON.stringify(obj)).length;
}
export function getApproximateSizeInBytes(obj: unknown) {
return new TextEncoder().encode(JSON.stringify(obj)).length;
}
I use the above, but I notice that the value returned and the actual value shown when an error is thrown can be quite different. How can I reliably check what is the size of a return value before returning it? Thanks in advance! @DaniFoldi
Ahz
Ahzā€¢3w ago
i have a differnt approach if my callee fails due to the 1mib limit, I call a new handler on the RPC which accepts chunks of args, assembles them and calls the initial function locally with no 1mib limit if my return fails due to large response size, i check the error, and re-call the same RPC handler with a streaming flag which then returns the response as a stream instead For me this works since large requests and responses are an edge case and the happy path defaults to no streaming 99.999% of the time
Warren
Warrenā€¢3w ago
Do you mind sharing some example code of how you do this? šŸ˜€ I wonder if I can adapt this to my problem, I suspect itā€™s a little different because Iā€™m using Cloudflare Workflows and they try to save the 1MiB return value from each step of the workflow hmm
DaniFoldi
DaniFoldiā€¢3w ago
Sorry I missed your reply - unfortunately I donā€™t think thereā€™s a good way since itā€™s stored as a byte stream, not a stringified (base64 or otherwise) version. They donā€™t have a built-in api since the exact encoding isnā€™t guaranteed and ā€œshould be transparent to the userā€ You can try opening an issue on GitHub on the workerd repo, since it is a fair question in my opinion, and the team might have a better answer for you
Warren
Warrenā€¢3w ago
All good! Do you mind elaborating on the ā€œshould be transparent to the userā€ part? That is, we should not have to think about this?
Ahz
Ahzā€¢2w ago
My code isn't suited to post here for the whole thing, but this is the key part which handles the stream processing on the caller side. Note the closure function I pass in is the actual RPC call.
export async function withStreamProcessing(
operation: (streaming: boolean) => Promise<unknown | ReadableStream<Uint8Array>>,
logger: any,
identifier: Record<string, any> = {},
forceStreaming?: boolean
): Promise<Response> {
try {
// If forceStreaming is true, skip the non-streaming attempt
if (!forceStreaming) {
// First attempt without streaming
const result = await operation(false);
return new Response(JSON.stringify(result), {
headers: { 'Content-Type': 'application/json' }
});
}

// Direct streaming or fallback after error
const streamResult = await operation(true);
if (!(streamResult instanceof ReadableStream)) {
return new Response(JSON.stringify(streamResult));
}

// Return the stream directly
return new Response(streamResult, {
headers: { 'Content-Type': 'application/json' }
});

} catch (error) {
logger.debug('Operation error', { error, forceStreaming, ...identifier });

// Check if it's the RPC serialization error and we haven't tried streaming yet
if (!forceStreaming &&
error instanceof Error &&
error.message.includes('Serialized RPC arguments or return values are limited')) {
logger.warn('Retrying with streaming due to size limitation', { ...identifier });
// Recursively call with forceStreaming
return withStreamProcessing(operation, logger, identifier, true);
}

logger.error('Operation failed:', { error, forceStreaming, ...identifier });
throw error;
}
}
export async function withStreamProcessing(
operation: (streaming: boolean) => Promise<unknown | ReadableStream<Uint8Array>>,
logger: any,
identifier: Record<string, any> = {},
forceStreaming?: boolean
): Promise<Response> {
try {
// If forceStreaming is true, skip the non-streaming attempt
if (!forceStreaming) {
// First attempt without streaming
const result = await operation(false);
return new Response(JSON.stringify(result), {
headers: { 'Content-Type': 'application/json' }
});
}

// Direct streaming or fallback after error
const streamResult = await operation(true);
if (!(streamResult instanceof ReadableStream)) {
return new Response(JSON.stringify(streamResult));
}

// Return the stream directly
return new Response(streamResult, {
headers: { 'Content-Type': 'application/json' }
});

} catch (error) {
logger.debug('Operation error', { error, forceStreaming, ...identifier });

// Check if it's the RPC serialization error and we haven't tried streaming yet
if (!forceStreaming &&
error instanceof Error &&
error.message.includes('Serialized RPC arguments or return values are limited')) {
logger.warn('Retrying with streaming due to size limitation', { ...identifier });
// Recursively call with forceStreaming
return withStreamProcessing(operation, logger, identifier, true);
}

logger.error('Operation failed:', { error, forceStreaming, ...identifier });
throw error;
}
}
on the callee side, I wrap my response in something like this if i want to stream the response
export function createTextStream(
results: any,
chunkSize: number = 1000 * 1000, // 1MiB default
logger: any
): ReadableStream<Uint8Array> {
const text = JSON.stringify(results);
const encoder = new TextEncoder();

return new ReadableStream({
start(controller) {
let offset = 0;
while (offset < text.length) {
const chunk = text.slice(offset, offset + chunkSize);
controller.enqueue(encoder.encode(chunk)); // Encode the text chunk
offset += chunkSize;
// logger.debug('enqueued', { offset });
}
// logger.debug('closing stream');
controller.close();
// logger.debug('closed stream');
}
});
}
export function createTextStream(
results: any,
chunkSize: number = 1000 * 1000, // 1MiB default
logger: any
): ReadableStream<Uint8Array> {
const text = JSON.stringify(results);
const encoder = new TextEncoder();

return new ReadableStream({
start(controller) {
let offset = 0;
while (offset < text.length) {
const chunk = text.slice(offset, offset + chunkSize);
controller.enqueue(encoder.encode(chunk)); // Encode the text chunk
offset += chunkSize;
// logger.debug('enqueued', { offset });
}
// logger.debug('closing stream');
controller.close();
// logger.debug('closed stream');
}
});
}
which in my case, is called on the RPC callee side in the response handler
const response = stream
? createTextStream(finalResults, this.conf.RPC_STREAM_CHUNK_SIZE, this.logger)
: finalResults;

return response;
const response = stream
? createTextStream(finalResults, this.conf.RPC_STREAM_CHUNK_SIZE, this.logger)
: finalResults;

return response;
` for added efficiency, I save the response in memory on the callee side so I don't have to reprocess it, but my RPC callee is a durable object which will maintain the state in memory for a few seconds which is enough for me
Warren
Warrenā€¢2w ago
Thank you this is very helpful!

Did you find this page helpful?