im currently getting ratelimited by an external service because things that are supposed to be delay

im currently getting ratelimited by an external service because things that are supposed to be delayed (using queues) are all getting run at once
11 Replies
Sam
Sam5mo ago
Hey the time between my message getting sent to queue -> being processed by my working is taking a few seconds before it even starts being processed. Anyway to speed this up?
Sam
Sam5mo ago
No description
Sam
Sam5mo ago
Seems 1000/1s is the minimum value in dashboard UI and cant be set via wrangler.toml
UNOwen
UNOwen5mo ago
Hey folks, fresh Queues user here. I am having issues with queue messages not being processed after they are sent back for a .retry(12 hours), and I can't figure out whether I've mis-configured something or am hitting an edge case. I produce queue messages in batch from my Pages website, it is basically a set of payloads for emails to be sent out, some to be sent out immediately (message has a field after: now.toISO()), and some upon delay (message has a field after: remindOn.toISO(), typically 3-5 days from now). The are 0-3 batches of these messages per day sent out worldwide, as in these are not high volume. The messages are received by a Workers consumer, generated from a standard Cloudflare workers template. The relevant bit is the .retry({delaySeconds: X}) of messages which should not be processed yet: if the after field of the payload message is in the future, I am sending the message back for a retry of up to 12 hours, because it is the documented maximum I found. What happens, is that the messages I've sent for a long retry don't seem to process until there is a fresh new unrelated message added to the queue, which seems to trigger the re-processing of .retry()-ed messages. This doesn't seem to happen globally either, but rather per region, as in that "fresh new unrelated message" will only trigger the re-processing of geographically proximate messages. E.g. I had somebody in Germany visit my website on Sept 11 and fill out the booking form for Sept 16 lesson; they received the "immediate" messages, but the "reminders" got stuck in limbo until Sept 11 until a completely different person somewhere in Russian filled out the form which triggered new messages to be added to the queue. Message sending batch:
await platform.env.bookings.sendBatch([
{ body: { action: 'notify', target: 'student', via: 'email', after: now.toISO(), message: 'requested_lesson', ...booking }},
...2 messages omitted...
message: 'requested_lesson', ...booking }},
{ body: { action: 'notify', target: 'student', via: 'email', after: remindOn.toISO(), message: 'confirm_lesson', ...booking }},
...2 messages omitted...
]);
await platform.env.bookings.sendBatch([
{ body: { action: 'notify', target: 'student', via: 'email', after: now.toISO(), message: 'requested_lesson', ...booking }},
...2 messages omitted...
message: 'requested_lesson', ...booking }},
{ body: { action: 'notify', target: 'student', via: 'email', after: remindOn.toISO(), message: 'confirm_lesson', ...booking }},
...2 messages omitted...
]);
Message receiving:
async queue(batch: MessageBatch<ActionMessage>, env: Env): Promise<void> {
const now = DateTime.now();
for (let message: Message<ActionMessage> of batch.messages) {
const sendAfter = DateTime.fromISO(message.body.after);
if (sendAfter > now) {
const delaySeconds = Math.round(Math.min(
Duration.fromObject({ hours: 12}).as('seconds'),
sendAfter.diff(now).as('seconds')));
message.retry({
delaySeconds: delaySeconds
});
console.info(`message ${message.id} DELAYED: ${JSON.stringify(message.body)} by ${delaySeconds}`);
} else ...
async queue(batch: MessageBatch<ActionMessage>, env: Env): Promise<void> {
const now = DateTime.now();
for (let message: Message<ActionMessage> of batch.messages) {
const sendAfter = DateTime.fromISO(message.body.after);
if (sendAfter > now) {
const delaySeconds = Math.round(Math.min(
Duration.fromObject({ hours: 12}).as('seconds'),
sendAfter.diff(now).as('seconds')));
message.retry({
delaySeconds: delaySeconds
});
console.info(`message ${message.id} DELAYED: ${JSON.stringify(message.body)} by ${delaySeconds}`);
} else ...
And here's the relevant consumer configuration:
[[queues.consumers]]
queue = "litcondit-bookings"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 100
dead_letter_queue = "litcondit-bookings-dlq"
[[queues.consumers]]
queue = "litcondit-bookings"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 100
dead_letter_queue = "litcondit-bookings-dlq"
So far I've tried to work around this issue by having a "ping" message being sent to the queue every 5 minutes, which seems to help a little – the messages in my local region are now not getting stuck (I am located in Canada, and it seems to help in NA), but it doesn't prevent messages from getting into the limbo stage in EU (assuming there aren't more regions by which Queues are partitioned, which I don't know about).
Bibi
Bibi5mo ago
Why don’t you use https://developers.cloudflare.com/queues/configuration/batching-retries/#delay-messages the delaySeconds property when creating the queue message?
Cloudflare Docs
Batching, Retries and Delays | Cloudflare Queues
When configuring a consumer Worker for a queue, you can also define how messages are batched as they are delivered.
Bibi
Bibi5mo ago
Oh that has a limit of 12 hours too But you could acknowledge the message and send it again with delay instead of retrying
UNOwen
UNOwen5mo ago
Re-sending messages might be a viable short-term workaround, but it would only function if sending messages is 100% reliable, and it doesn't seem like this "limbo" behavior is clearly intended – I am either misusing the retry API, or it has an edge case I am hitting. Re-sending the message would side-step the issue without documenting the details for future users (the "limbo" behavior is confusing and unlike other queues I worked with so far) or fixing the edge case.
boywonder350
boywonder3505mo ago
I have the paid pro plan $25/month and the docs says queue consumers can have 15 min cpu time, but a worker I just created is timing out after 30s. If I try to update the cpu time via the dashboard, it says a maximum amount is 30s (30000ms). What is going on here? ^ Seems like this is a bug or something? I am on the "Standard" plan and the worker I am writing is only a queue consumer andn othing else I hope this is not a docs. typo because I wrote a bunch of code with this presumption :/
ajgeiss0702
ajgeiss0702OP5mo ago
i think you got them mixed up. They can have 15 minutes wall clock time. 30s cpu time
No description
boywonder350
boywonder3505mo ago
No description
ajgeiss0702
ajgeiss0702OP5mo ago
oh hm the docs are inconsistant apparently the 30s cpu time is what it actually is. Whether thats intentional or not, who knows

Did you find this page helpful?