maybe a short question on queues: x-

maybe a short question on queues: x-posting from workers-help: https://discord.com/channels/595317990191398933/1325280355908976862
7 Replies
sup filistine
sup filistineOP3w ago
So I basically replaced my consumer logic with:
Promise((resolve) => {
setTimeout(resolve, 60_000);
})
Promise((resolve) => {
setTimeout(resolve, 60_000);
})
Wrote a k6 test that queues 10 messages to my producer quickly. Got rid of any RPC bindings in the consumer since its handling is essentially a noop. Out of the 10 messages, 1 is delivered in the beginning. Then 60s pass, and another 2 are delivered. Rest of the 7 are dropped. This is with:
max_batch_size = 1 # optional: defaults to 10
max_batch_timeout = 0 # optional: defaults to 5 seconds
max_retries = 0 # don't retry for now
max_batch_size = 1 # optional: defaults to 10
max_batch_timeout = 0 # optional: defaults to 5 seconds
max_retries = 0 # don't retry for now
Does anyone know why consumers won't scale up to handle the newer messages as needed, and why the system would instead drop 70% of messages? If I change the setTimeout to only wait 3 seconds, this kind of dropping of messages doesn't happen. I can even ramp up the k6 test to do 100 messages, and the system keeps up. Average backlog is only 2 or 3. Concurrency slowly rises from 0.5 to 0.6. I am sure the lower value to historically bad concurrency before I reduced the timeout. The time window on dashboard is 30m, of which, only last 10m or so were conducted with smaller timeouts. Truth is that the production use-case I am trying to meet will take the consumer 60-100s - so how do I go about acheiving what 3s are giving me at those values? The request rate won't be very high and I intend to put exponential backoff in the system on a per message basis depending on the current request rate. But for a baseline, it would be nice to know why they aren't scaling up for an occasional 10 messages.
sup filistine
sup filistineOP3w ago
Ref: https://grafana.com/docs/k6/latest/using-k6/scenarios/executors/shared-iterations/ With a shared_iterations k6 executor and setTimeout changed to 10s:
example_scenario: {
// name of the executor to use
executor: 'shared-iterations',

// common scenario configuration
startTime: '10s',
gracefulStop: '5s',
tags: { example_tag: 'testing' },

// executor-specific configuration
vus: 10,
iterations: 20,
maxDuration: '10s',
},
example_scenario: {
// name of the executor to use
executor: 'shared-iterations',

// common scenario configuration
startTime: '10s',
gracefulStop: '5s',
tags: { example_tag: 'testing' },

// executor-specific configuration
vus: 10,
iterations: 20,
maxDuration: '10s',
},
I saw 20 messages queue up quickly and not everything having to wait 10s before consumer logs showed up (I am using Betterstack for logging, so which is slightly more instantaneous than cloudflare dashboard). Concurrency remains around 0.8 and backlog switches between 8-10. Successive runs seem to be even better. Repeated a few more times, and the concurrency is 1.1 and average backlog was 13. I am not sure if this means that the system is adaptive and learning, but how is it supposed to learn a 60s-100s delay in consumer processing with message failures? Shouldn't it hold on to the messages if its training on the incoming dataset?
Grafana Labs
Shared iterations | Grafana k6 documentation
A fixed number of iterations are "shared" between a number of VUs, and the test ends once all iterations are executed.
sup filistine
sup filistineOP3w ago
You know what magically fixes it? A delivery delay of some amount, say 30s. I have no idea why that works! Without it, it just silently drops messages, the consumer never even gets invoked when consumer runs take 60s or similar for 90% of the messages. But put that delay in the dashboard, and I am golden. Yet to check what's the wrangler equivalent for setting up the delay (unlike workers, queues aren't in declarative toml, I realize).
No description
Pranshu Maheshwari
Hey! This is because we scale up concurrent consumers at the end of processing each batch documented here: https://developers.cloudflare.com/queues/configuration/consumer-concurrency/#why-are-my-consumers-not-autoscaling So if your consumer includes a promise that waits for 60s, your consumer won't autoscale until the end of the 60s Messages aren't dropped though, they're just queued up. Could you tell me how you're detecting that messages are dropped?
sup filistine
sup filistineOP2w ago
hello, thanks for your response. the way i am detecting messages dropping is putting a log line as the very first line in my consumer’s queue handler. i never see that log appear in 7/10 messages in the very first dropout scenario i described. also, if i go to the dashboard for the queue, it doesnt show failures at that point in time. so message 1 gets delivered. 60s elapse. message 2 and 3 get through, which dont have to chronological 2 and 3 in my experience. there is a uuid on each message and the 3 selected out of 10 are not contiguous or according what the producer saw first. i would be happy to do a virtual session and show a live demo of the issue, but given all i had to do was use a setTimeout queue handler implementation, i think you could repro the issue quite easily. i am using a remote logs sink in BetterStack but they ensure using ctx.waitUntil that logs arent missed. also, in cloudflare dashboard for the consumer worker, i guess, i could look for queue events and count them, something i forgot to check last time, all i know for a fact is that there were no logs, and my consumer is pretty (log) chatty it terms of success or failure in processing. i currently catch and suppress the exception, opting for setting ack on both cases. @Pranshu Maheshwari Is having a max_retries=0 related to dropouts? I definitely see dropouts when that is set on my queue. Not even a first attempt at delivering a message to queue, in case, its handler that got the first batch is busy. I know you have said above that dropouts shouldn't be a thing, but I continue to see them and I have to launch this product in the coming month on this platform. It would be good to know about its characterstics, so that I can fine tune the configuration for minimum dropouts.
Pranshu Maheshwari
Gotcha, thanks this is helpful. It's more likely that the logs aren't being delivered than the messages are being dropped fwiw. Could you share your queue ID & account ID? both are safe to share here. I'll double check that everything is working as expected
sup filistine
sup filistineOP4d ago
Thanks for getting in touch! For now, I realized I am okay with max_concurrency=1, and using a package like pLimit (https://github.com/sindresorhus/p-limit) for concurrency. I will be back if I ever need more than a concurrency of 1 from the queues themselves.
GitHub
GitHub - sindresorhus/p-limit: Run multiple promise-returning & asy...
Run multiple promise-returning & async functions with limited concurrency - sindresorhus/p-limit

Did you find this page helpful?