Ideally queues could allows us to list pending messages

Ideally queues could allows us to list pending messages
14 Replies
piffie
piffie6mo ago
is there a way of getting the name of the queue, when you have the queue object?
clearwater4227
clearwater42276mo ago
Hi everyone! I am starting looking into using queues but have one question...can I configure a worker script as a queue consumer through terraform? I havent found the setup yet..still looking but if anyone here knows or can point me to the info... it looks like I can configure the queue reference on a worker to send a message but I need to configure the consumer through terraform also. or is it just a worker with the queue() function? hmm...Ill try it hmmm...its not picking up my worker as a consumer... is there a way to configure a queue consumer through terraform?
piffie
piffie6mo ago
never used terraform for it - sorry
berkinovish
berkinovish6mo ago
Is there a way to force a queue to increase concurrency, or to set a minimum concurrency? I'm seeing the following behaviour now: I have a queue where only once every 1-5 minutes, a message is queued. A message takes about 30 seconds wall time to be processed. Maximum consumer concurrency is on default. But, when a message comes in that all of a sudden takes 30 minutes to process, the whole queue will be stuck until that one message is processed (that's the behaviour I'm observing at least). This behaviour is in line with the docs, since Queues will only check after processing a batch of messages if the concurrent consumers should be adjusted. Very annoying behaviour of course - because of a slow message, I suddenly have messages waiting for 30 minutes in the queue. Only after the message is processed, the concurrency will be updated and all waiting messages will be handled. Being able to set a minimum concurrency would be a big help here
John Spurlock
John Spurlock6mo ago
anyone else see queue send failures across of dozens of colos at around 2024-08-26 06:21:46 UTC? Error: Queue send failed: Internal Server Error Haven't seen anything across so many colos at once like this before
elithrar
elithrar6mo ago
@Pranshu Maheshwari FWIW I don't see anything in top-level metrics for Queues — no meaningful dip in write or read throughput around 6:21 UTC or increase in 5xx error ratio. Pranshu can dig in with the team.
John Spurlock
John Spurlock6mo ago
ok thanks, it was a relatively short duration (ended 06:22:09, but seems like essentially all sends failed (these are after client-side retries) during that time - worker connection to the rest of cf was ok (I was able to fallback to saving msgs to DO for later playback)
ウゴ
ウゴ6mo ago
i take the liberty to link that message here since it might be related to queues https://discord.com/channels/595317990191398933/779390076219686943/1277962186135834635
Julio Ribeiro
Julio Ribeiro6mo ago
Hey team, whats up? I have a use case for queues and want to know if my logic is correct: - I have a producer that posts around 500 messages at once on the queue, using batches of 25 messages - All batches are delayed incrementally until the last batch on increments of 30 seconds each batch (first batch for 0 seconds, the second for 30 seconds, the third for 60 seconds...) I was doing this process previously with a D1 column for the queue and a scheduled workers that runs each minute, but wanted to try it on using queues The consumer should receive the batches following the delays, right? The problem I'm facing: All the batches get processed at once, not obeying the delay sent while creating the batches. I need this delay on the batches due to Slack API limitation that I use to process these messages, avoiding receiving 429s. Is there any problem on the CF side causing this problem, or the queue will process all messages at once even with the delays? Here are some snippets so you can take a look at my code: queue definition:
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
[[queues.producers]]
queue = "queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "queue"
max_batch_size = 25
max_batch_timeout = 30
max_concurrency = 1
producer:
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
let delaySeconds = 0
while (slackUsers.length) {
console.log(delaySeconds)
await triggerMessageQueue.sendBatch(
slackUsers.splice(0, 25).map(({ id, dmChannel }) => ({
body: {
slackUserId: id,
slackUserDmChannel: dmChannel,
awarenessMessageId
}
})),
{ delaySeconds: delaySeconds }
)
delaySeconds += 30
}
consumer:
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
export const triggerMessageQueueConsumer = async (
batch: MessageBatch<QueueMessage>,
env: QueueEnv
) => {
console.log(`${new Date().toISOString()} Executing batch...`)
console.log(`Received ${batch.messages.length}`)
for (const message of batch.messages) {
console.log(message.timestamp, message.attempts)
message.ack()
}
}
Conner
Conner6mo ago
I think I might be seeing some messages dropped from my queue? Some final data in our db is missing and I traced it back to this. Cloudflare dashboard shows 321 messages successfully delivered Logpush logs only show 313 events from that queue All of these messages were sent in the same few seconds/min. What happened to the other 8 messages?
gwapes
gwapes6mo ago
Hi, new to queues. I'm doing this:
await env.queue.send(
data,
{ delaySeconds: 42300, contentType: "json" }
).then(() => console.log("done"));
await env.queue.send(
data,
{ delaySeconds: 42300, contentType: "json" }
).then(() => console.log("done"));
However, whenever I view on the dashboard and list messages, nothing shows, however "done" shows in the live logs. Am I doing something wrong? Smaller times work though.
johtso
johtso6mo ago
Shouldn't this page document the attempts property on a Message? https://developers.cloudflare.com/queues/configuration/javascript-apis/ wanted to quickly check if the value is 0 or 1 the first time a message is received, but saw it wasn't documented at all
johtso
johtso6mo ago
GitHub
Queues: Message.attempts is undocumented · Issue #16542 · cloudflar...
Proposed changes Document the Messages.attempts property here: https://developers.cloudflare.com/queues/configuration/javascript-apis/ Subject Matter Message.attempts property of a queue message Co...
Unknown User
Unknown User5mo ago
Message Not Public
Sign In & Join Server To View

Did you find this page helpful?