Is there a way to force a queue to increase concurrency, or to set a minimum concurrency? I'm seein

Is there a way to force a queue to increase concurrency, or to set a minimum concurrency? I'm seeing the following behaviour now: I have a queue where only once every 1-5 minutes, a message is queued. A message takes about 30 seconds wall time to be processed. Maximum consumer concurrency is on default. But, when a message comes in that all of a sudden takes 30 minutes to process, the whole queue will be stuck until that one message is processed (that's the behaviour I'm observing at least). This behaviour is in line with the docs, since Queues will only check after processing a batch of messages if the concurrent consumers should be adjusted. Very annoying behaviour of course - because of a slow message, I suddenly have messages waiting for 30 minutes in the queue. Only after the message is processed, the concurrency will be updated and all waiting messages will be handled. Being able to set a minimum concurrency would be a big help here
15 Replies
John Spurlock
John Spurlock6mo ago
anyone else see queue send failures across of dozens of colos at around 2024-08-26 06:21:46 UTC? Error: Queue send failed: Internal Server Error Haven't seen anything across so many colos at once like this before
elithrar
elithrar6mo ago
@Pranshu Maheshwari FWIW I don't see anything in top-level metrics for Queues — no meaningful dip in write or read throughput around 6:21 UTC or increase in 5xx error ratio. Pranshu can dig in with the team.
John Spurlock
John Spurlock6mo ago
ok thanks, it was a relatively short duration (ended 06:22:09, but seems like essentially all sends failed (these are after client-side retries) during that time - worker connection to the rest of cf was ok (I was able to fallback to saving msgs to DO for later playback)
ウゴ
ウゴ6mo ago
i take the liberty to link that message here since it might be related to queues https://discord.com/channels/595317990191398933/779390076219686943/1277962186135834635
Julio Ribeiro
Julio Ribeiro6mo ago
Hey team, whats up? I have a use case for queues and want to know if my logic is correct: - I have a producer that posts around 500 messages at once on the queue, using batches of 25 messages - All batches are delayed incrementally until the last batch on increments of 30 seconds each batch (first batch for 0 seconds, the second for 30 seconds, the third for 60 seconds...) I was doing this process previously with a D1 column for the queue and a scheduled workers that runs each minute, but wanted to try it on using queues The consumer should receive the batches following the delays, right? The problem I'm facing: All the batches get processed at once, not obeying the delay sent while creating the batches. I need this delay on the batches due to Slack API limitation that I use to process these messages, avoiding receiving 429s. Is there any problem on the CF side causing this problem, or the queue will process all messages at once even with the delays? Here are some snippets so you can take a look at my code: queue definition:
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
producer:
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
consumer:
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
Conner
Conner6mo ago
I think I might be seeing some messages dropped from my queue? Some final data in our db is missing and I traced it back to this. Cloudflare dashboard shows 321 messages successfully delivered Logpush logs only show 313 events from that queue All of these messages were sent in the same few seconds/min. What happened to the other 8 messages?
gwapes
gwapes6mo ago
Hi, new to queues. I'm doing this:
await env.queue.send(
data,
{ delaySeconds: 42300, contentType: "json" }
).then(() => console.log("done"));
await env.queue.send(
data,
{ delaySeconds: 42300, contentType: "json" }
).then(() => console.log("done"));
However, whenever I view on the dashboard and list messages, nothing shows, however "done" shows in the live logs. Am I doing something wrong? Smaller times work though.
johtso
johtso6mo ago
Shouldn't this page document the attempts property on a Message? https://developers.cloudflare.com/queues/configuration/javascript-apis/ wanted to quickly check if the value is 0 or 1 the first time a message is received, but saw it wasn't documented at all
johtso
johtso6mo ago
GitHub
Queues: Message.attempts is undocumented · Issue #16542 · cloudflar...
Proposed changes Document the Messages.attempts property here: https://developers.cloudflare.com/queues/configuration/javascript-apis/ Subject Matter Message.attempts property of a queue message Co...
Unknown User
Unknown User5mo ago
Message Not Public
Sign In & Join Server To View
lusine
lusine5mo ago
Hi team, I need to use event notifications with R2 and queues when new objects are created. I have a job written in C#/.NET which is supposed to read messages from this queue. My question is: do you have a .NET SDK that I can use for receiving/acknowledging messages from queues? For example, I think R2 is compatible with AWS S3 SDK and I can use AmazonS3Client with R2, is that the case with queues? Can I use AmazonSqsClient with Cloudflare queues? Or what other options do I have?
Pranshu Maheshwari
yes!
elithrar
elithrar5mo ago
You can pull via HTTP - we don’t have a .NET SDK and the SQS API is far less of a defacto standard vs S3, and so we have no plans to support it. https://developers.cloudflare.com/queues/configuration/pull-consumers/
Cloudflare Docs
Pull consumers | Cloudflare Queues
A pull-based consumer allows you to pull from a queue over HTTP from any environment and/or programming language outside of Cloudflare Workers. A pull-based consumer can be useful when your message consumption rate is limited by upstream infrastructure or long-running tasks.
Unknown User
Unknown User5mo ago
Message Not Public
Sign In & Join Server To View
Hard@Work
Hard@Work5mo ago
?crossposting

Did you find this page helpful?