hello help me, even though the batch size is 1 when I trigger a message in the queue with delay seco

hello help me, even though the batch size is 1 when I trigger a message in the queue with delay seconds of 5 minutes. After this time, this message is not sent to my consumer. Is it only triggered when I add another message to the queue?
17 Replies
flof.fly
flof.fly2mo ago
How do I create a queue for a test env worker, and another queue for another worker in prod? I have a bit of confusion on how multi-env works in wrangler
No description
kian
kian2mo ago
[[env.???.queues.consumers]]/[[env.???.queues.producers]]
flof.fly
flof.fly2mo ago
Nocapping thanks a lot kian
pao ramen
pao ramen2mo ago
Hello! I have a very misterious bug with queues. I have a job that iterates a batch of 3 messages, but it only iterates 2 messages and then it never gets to the third:
export async function queue(batch, env) {
console.log({ messages: JSON.stringify(batch.messages) }) // This logs an array of 3 objects

for (const msg of batch.messages) {
console.log({ message: JSON.stringify(msg) }) // This only logs twice

try {
// ...do things
msg.ack()
} catch (e) {
console.error(`adjustUser error: ${e}`) // This never logs
msg.retry()
}
}
}
export async function queue(batch, env) {
console.log({ messages: JSON.stringify(batch.messages) }) // This logs an array of 3 objects

for (const msg of batch.messages) {
console.log({ message: JSON.stringify(msg) }) // This only logs twice

try {
// ...do things
msg.ack()
} catch (e) {
console.error(`adjustUser error: ${e}`) // This never logs
msg.retry()
}
}
}
This is my wrangler configuration
[[queues.producers]]
queue = "adjust-user"
binding = "ADJUST_USER_QUEUE"

[[queues.consumers]]
queue = "adjust-user"
max_batch_size = 10
max_batch_timeout = 5
retry_delay = 600
dead_letter_queue = "failed-adjust-user"
[[queues.producers]]
queue = "adjust-user"
binding = "ADJUST_USER_QUEUE"

[[queues.consumers]]
queue = "adjust-user"
max_batch_size = 10
max_batch_timeout = 5
retry_delay = 600
dead_letter_queue = "failed-adjust-user"
Any clue?
Saqib (Langbase)
Hey folks, I am trying to do some processing on queues, surprisingly they timeout after 60s inside the consumer. On the paid plan, currently testing locally. Any thoughts? It just stops processing no error, nothing.
toadfans
toadfans2mo ago
hello, is queue down now?
{

"name": "Error",
"message": "Queue send failed: Internal Server Error",
"stack": "Error: Queue send failed: Internal Server Error\n at async Object.mutation (bundledWorker-0.9232964110947641.mjs:9562:3)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:28094:17)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:14535:28)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:8361:42)"
}
{

"name": "Error",
"message": "Queue send failed: Internal Server Error",
"stack": "Error: Queue send failed: Internal Server Error\n at async Object.mutation (bundledWorker-0.9232964110947641.mjs:9562:3)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:28094:17)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:14535:28)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:8361:42)"
}
requestId: 8cfd3fdf8f3984b7
Lance
Lance2mo ago
Hi, anyone encounters similar issue that queue's backlog just grows and no message is delivered to worker handler? Nothing changed from code side. It just suddenly happened a few hours ago.
No description
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
Sam
Sam2mo ago
Any update on when queues are coming to remote mode https://github.com/cloudflare/workerd/issues/855 or is it possible to run queues locally and my other bindings remotely?
GitHub
🐛 BUG: wrangler dev --remote fails with Error 1104, Script not fo...
What version of Wrangler are you using? 2.6.2 What operating system are you using? Windows Describe the Bug Add a queue binding in your wrangler.toml, in my case: [[queues.producers]] queue = &quot...
ac
ac5w ago
Hi, I am having issues using a Pages deployment as a queue producer. It doesn't throw any error, and the service binding for the Queue is there for the Pages app, but the queue never gets a message even though the enqueueing code is definitely getting called. I did notice that on the queue, the "Producer" shows a production and a preview deployment but they just have a generic pages-worker-###-production format and aren't using the name of my Pages app
Unknown User
Unknown User4w ago
Message Not Public
Sign In & Join Server To View
dangoldin
dangoldin4w ago
Hopefully a quick question - I'm doing some local dev and wanted to confirm my queues are set up properly but I don't see anything queue related in the .wrangler/state/v3/ path (it has cache and d1). Should there be anything created there? I have a Remix app that's published to a queue and then a separate worker to consume so trying to figure out why the worker isn't seeing the messages being published
Unknown User
Unknown User4w ago
Message Not Public
Sign In & Join Server To View
John Spurlock
John Spurlock4w ago
+1, even a notion like sqs's message-group-id as a scaling worker consumer hint, not for fifo (since cf doesn't do fifo)
Pranshu Maheshwari
thanks for the feedback here! very detailed & agreed that this is a pain point with queues today 🙂 we're thinking about ways to improve this. our plan right now is to add the ability to connect multiple consumers to a single queue. you'll be able to specify partition keys to route specific messages to specific consumers. and ideally this can be dynamic, so you don't have to specify all the rules ahead of time. this project is still in the design phase though, so it's too soon to share timelines. dynamically creating queues doesn't work well today, alas! rather than 1 queue per user, would you be able to setup a few queues as high-priority queues, and setup separate low priority queues? by default, messages get routed to the high priority queues. if a user uploads 10k files at once, you route those messages to the low priority queues so that they don't block each other?
Brett Willis
Brett Willis4w ago
Any plans to support pushing to Queues from an external platform direct via the rest API (not via an intermediate Worker)?
Pranshu Maheshwari
We published a deep dive on the Queues architecture today! https://blog.cloudflare.com/how-we-built-cloudflare-queues/
The Cloudflare Blog
Durable Objects aren't just durable, they're fast: a 10x speedup fo...
Learn how we built Cloudflare Queues using our own Developer Platform and how it evolved to a geographically-distributed, horizontally-scalable architecture built on Durable Objects. Our new architecture supports over 10x more throughput and over 3x lower latency compared to the previous version.
Want results from more Discord servers?
Add your server