Murder Chicken
Murder Chicken
CDCloudflare Developers
Created by Murder Chicken on 9/27/2024 in #workers-help
RPC, `using`, Experimental Wrangler and Observability
Are you awaiting the method call?
9 replies
CDCloudflare Developers
Created by Murder Chicken on 9/27/2024 in #workers-help
RPC, `using`, Experimental Wrangler and Observability
In my use-case, I'm using Service Bindings from within a scheduled worker... IIRC they mention something about being limited to fetch handlers but it seems to work for me 🤷‍♂️
9 replies
CDCloudflare Developers
Created by Murder Chicken on 9/27/2024 in #workers-help
RPC, `using`, Experimental Wrangler and Observability
So far, the approach I used above seems to work. The only odd behavior I'm seeing is duplicate log entries on the Service Worker housing the RPC methods.
9 replies
CDCloudflare Developers
Created by Murder Chicken on 9/27/2024 in #workers-help
RPC, `using`, Experimental Wrangler and Observability
Unclear but it seems to deploy and work whereas use of usingrequires a different wrangler so it understands how to transpile using statements into Symbol.dispose statements.
9 replies
CDCloudflare Developers
Created by Murder Chicken on 9/27/2024 in #workers-help
RPC, `using`, Experimental Wrangler and Observability
I'm attempting to solve this without the experimental wrangler using the following code:
/**
* Executes an RPC call and ensures proper cleanup.
* @param {Function} rpcCall - A function that performs the RPC call.
* @returns {Promise<any>} - The result of the RPC call.
*/
export async function executeRpcWithCleanup(rpcCall) {
let rpcResult = null;
try {
rpcResult = await rpcCall();
return rpcResult;
} finally {
if (rpcResult && typeof rpcResult[Symbol.dispose] === 'function') {
console.info('Disposing RPC result');
rpcResult[Symbol.dispose]();
} else {
console.info('No Symbol.dispose method on rpcResult');
}
}
}
/**
* Executes an RPC call and ensures proper cleanup.
* @param {Function} rpcCall - A function that performs the RPC call.
* @returns {Promise<any>} - The result of the RPC call.
*/
export async function executeRpcWithCleanup(rpcCall) {
let rpcResult = null;
try {
rpcResult = await rpcCall();
return rpcResult;
} finally {
if (rpcResult && typeof rpcResult[Symbol.dispose] === 'function') {
console.info('Disposing RPC result');
rpcResult[Symbol.dispose]();
} else {
console.info('No Symbol.dispose method on rpcResult');
}
}
}
... and executing the method like this:
const response = await executeRpcWithCleanup(() => env.SERVICE.myRpcMethod());
const response = await executeRpcWithCleanup(() => env.SERVICE.myRpcMethod());
However, I'm seeing odd behavior in that the RPC methods seem to be being called twice when I view the SERVICE worker's logs. When I tried an implementation of the using method, I did not see this. Not sure what's happening but anything that might cause RPC methods to be called multiple times using a single invocation of the method?
9 replies
CDCloudflare Developers
Created by Shravan on 9/11/2024 in #workers-discussions
I seem to have issues compiling with
Were you ever able to determine if there was a way to do this through flags?
6 replies
CDCloudflare Developers
Created by Murder Chicken on 9/26/2024 in #coding-help
If I have two "headless" workers (one
Managed to figure this out. For anyone interested: https://discordapp.com/channels/595317990191398933/1288258448626290771
2 replies
CDCloudflare Developers
Created by Murder Chicken on 9/24/2024 in #workers-help
Serialized RPC arguments or return values are limited to 1MiB
For anyone who stumbles into this thread... I managed to figure this out (thanks Cursor IDE). Worker making the RPC call with a large payload...
const data = [[data], [data], [data], ...];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
data.forEach(chunk => {
const bytes = encoder.encode(JSON.stringify(chunk) + '\n');
controller.enqueue(bytes);
});
controller.close();
}
});

// Send to other worker RPC method through Service Binding...
await env.SERVICE_BINDING.rpcMethod(stream);
const data = [[data], [data], [data], ...];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
data.forEach(chunk => {
const bytes = encoder.encode(JSON.stringify(chunk) + '\n');
controller.enqueue(bytes);
});
controller.close();
}
});

// Send to other worker RPC method through Service Binding...
await env.SERVICE_BINDING.rpcMethod(stream);
Consumer worker with RPC method...
async rpcMethod(stream) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = '';
let data = [];

try {
while (true) {
let { done, value } = await reader.read().catch(error => {
console.warn('Stream read error:', error);
return { done: true, value: undefined };
});

if (done) {
if (buffer) {
try {
data.push(JSON.parse(buffer));
} catch (error) {
console.warn('Error parsing remaining buffer:', error);
}
}
break;
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();

for (const line of lines) {
if (line) {
try {
data.push(JSON.parse(line));
} catch (error) {
console.warn('Error parsing JSON:', error, 'Line:', line);
}
}
}
}
} catch (error) {
console.error('Stream processing error:', error);
} finally {
reader.releaseLock();
}

// Do whatever with the data
}
async rpcMethod(stream) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = '';
let data = [];

try {
while (true) {
let { done, value } = await reader.read().catch(error => {
console.warn('Stream read error:', error);
return { done: true, value: undefined };
});

if (done) {
if (buffer) {
try {
data.push(JSON.parse(buffer));
} catch (error) {
console.warn('Error parsing remaining buffer:', error);
}
}
break;
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();

for (const line of lines) {
if (line) {
try {
data.push(JSON.parse(line));
} catch (error) {
console.warn('Error parsing JSON:', error, 'Line:', line);
}
}
}
}
} catch (error) {
console.error('Stream processing error:', error);
} finally {
reader.releaseLock();
}

// Do whatever with the data
}
5 replies
CDCloudflare Developers
Created by Murder Chicken on 9/26/2024 in #coding-help
If I have two "headless" workers (one
The data structure is simple (just a lot of elements) and looks like this:
[
[132323, '2024-12-01', 'VALID'],
[433323, '2024-12-02', 'INVALID'],
[345233, '2024-12-03', 'VALID'],
... (50k + records)
]
[
[132323, '2024-12-01', 'VALID'],
[433323, '2024-12-02', 'INVALID'],
[345233, '2024-12-03', 'VALID'],
... (50k + records)
]
2 replies
CDCloudflare Developers
Created by Murder Chicken on 9/24/2024 in #workers-help
Serialized RPC arguments or return values are limited to 1MiB
Also, the documentation notes that the Stream API is only available within a fetch event. Do I have this or other options within a scheduled event?
5 replies
CDCloudflare Developers
Created by Murder Chicken on 9/24/2024 in #workers-help
Serialized RPC arguments or return values are limited to 1MiB
@Purple Blob thanks for the input. Are you aware of any code examples of how to do with with RPC service bindings?
5 replies