Hey team, whats up?

Hey team, whats up? I have a use case for queues and want to know if my logic is correct: - I have a producer that posts around 500 messages at once on the queue, using batches of 25 messages - All batches are delayed incrementally until the last batch on increments of 30 seconds each batch (first batch for 0 seconds, the second for 30 seconds, the third for 60 seconds...) I was doing this process previously with a D1 column for the queue and a scheduled workers that runs each minute, but wanted to try it on using queues The consumer should receive the batches following the delays, right? The problem I'm facing: All the batches get processed at once, not obeying the delay sent while creating the batches. I need this delay on the batches due to Slack API limitation that I use to process these messages, avoiding receiving 429s. Is there any problem on the CF side causing this problem, or the queue will process all messages at once even with the delays? Here are some snippets so you can take a look at my code: queue definition:
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
producer:
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
consumer:
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
10 Replies
Julio Ribeiro
Julio RibeiroOP6mo ago
My account ID: 7510554ea7168a839870cd38eca50fc7 My queue ID: dd267ab1486c49fca646d718a13444db
Pranshu Maheshwari
thanks, your code looks ok! there's a chance this is a bug on our side. let me dig into this a bit further
Julio Ribeiro
Julio RibeiroOP6mo ago
thanks!
Pranshu Maheshwari
we're still looking into this one btw! haven't found the issue yet, but we're going to try a couple of things out. could you do me a favor & keep writing messages to this queue?
Ahz
Ahz6mo ago
generally a safer practice to assume queues could dumped on / overloaded for X reasons and not let it DDoS yourself by limiting the consumption within your control
Julio Ribeiro
Julio RibeiroOP5mo ago
hey there. sorry for the delay. the queue is prodcing and consuming once again. looks like the delay problem still persists, all messages with delays are being consumed at once Any news on this one?
Pranshu Maheshwari
we're still looking into this! haven't found the issue yet hey! we've found the issue here. we're pushing out a fix!
Julio Ribeiro
Julio RibeiroOP5mo ago
Nice! Thanks! Give me a heads up when it is pushed so I can test it again
kcbrewron
kcbrewron5mo ago
Given it's a queue mechanism, conceptually, I agree with some, you should expect it to be overrun. Two ideas though, on the consumer side, retry with a delay if you received a failure. Have you tried setting your delay in the producer configuration. [[queues.producers]] binding = "<BINDING_NAME>" queue = "<QUEUE-NAME>" delivery_delay = 60 # delay every message delivery by 1 minute
Pranshu Maheshwari
should be resolved!

Did you find this page helpful?