R
RunPod12mo ago
justin

Does async generator allow a worker to take off multiple jobs? Concurrency Modifier?

I was reading the runpod docs, and I saw the below. But does an async generator_handler, mean that if I sent 5 jobs for example that one worker will just keep on picking up new jobs? I also tried to add the:
"concurrency_modifier": 4
"concurrency_modifier": 4
But if I queued up 10 jobs, it would first max out the workers, and just have jobs sitting in queue, rather than each worker picking up to the max number of concurrency modifiers? https://docs.runpod.io/serverless/workers/handlers/handler-async
import runpod
import asyncio

async def async_generator_handler(job):
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(1)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler, # Required: Specify the async handler
"return_aggregate_stream": True, # Optional: Aggregate results are accessible via /run endpoint
}
)
import runpod
import asyncio

async def async_generator_handler(job):
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(1)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler, # Required: Specify the async handler
"return_aggregate_stream": True, # Optional: Aggregate results are accessible via /run endpoint
}
)
Asynchronous Handler | RunPod Documentation
RunPod supports the use of asynchronous handlers, enabling efficient handling of tasks that benefit from non-blocking operations. This feature is particularly useful for tasks like processing large datasets, interacting with APIs, or handling I/O-bound operations.
Solution:
``` import runpod import asyncio import random ...
Jump to solution
43 Replies
Alpay Ariyak
Alpay Ariyak12mo ago
Try “concurrency_modifier”: lambda x: <MAX CONCURRENCY, e.g. 4>
justin
justinOP12mo ago
Im not sure it works? Hm. I sent like 3 requests to the handler, and the third request just sits by itself.
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(30)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": False,
"concurrency_modifier": lambda x: 4,
}
)
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(30)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": False,
"concurrency_modifier": lambda x: 4,
}
)
No description
No description
No description
No description
Alpay Ariyak
Alpay Ariyak12mo ago
@Justin Merrell
Justin Merrell
Justin Merrell12mo ago
Do you see anything in the logs?
justin
justinOP12mo ago
Sorry meant to respond, I don't see anything in my logs? it just responds with what is expected the: Generated async token output {i}; \n {job} so the async generator works, it just that my expectation is if I send like 10 requests into the queue, and I have the async generator: 1. The number of max workers will first scale up (so let's say for ex. 3) 2) Meaning that now 7 jobs remain in the queue 3. That even though the worker is pending with the asyncio sleep for 30 seconds, b/c it is an async generator, it will keep pulling off the queue, so each worker will take another X-1 lambda jobs from the queue. So that way I don't just have my jobs sitting on the queue, and each worker can handle stuff more efficiently. @Justin Merrell Is maybe the way above not the expected behavior? Honestly, would just love to be able to have one worker take more jobs at once, just I haven't even been able to get hello world working.
Justin Merrell
Justin Merrell12mo ago
The concurrency modifier as a worker level and tells the worker how many jobs the worker should try to mantain at any given time
justin
justinOP12mo ago
I think one of the issues is that this is maybe not properly working? B/c I can see for ex. the first two jobs have a delay time of 4 - 8 seconds, while the others have ever-increasing delay time (due to sitting in queue).
No description
justin
justinOP12mo ago
I just feel that this is unexpected behavior though otherwise that it is not picking up the jobs though still which is being defined by the concurrency_modifier. Wouldn't expect the jobs to just sit in queue. So hopefully can give advice or check it up some point. The below is the exact code, and I just feel it is a bit weird that I can't get something that I feel is pretty simple to work.
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"concurrency_modifier": lambda x: 4,
}
)
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"concurrency_modifier": lambda x: 4,
}
)
Justin Merrell
Justin Merrell12mo ago
Try running it again with 1 active worker and 1 max worker
justin
justinOP12mo ago
@Justin Merrell Tried it out, and still the same result, where essentially with one active worker, 1 max worker, only the first request is taken
justin
justinOP12mo ago
@Justin Merrell Is it maybe b/c it isn't concurrency_modifier but rather max_concurrency?
justin
justinOP12mo ago
No description
justin
justinOP12mo ago
I was looking at the runpod SDK: https://github.com/runpod/runpod-python/blob/main/runpod/serverless/core.py#L175 And maybe this is the code block where you guys are grabbing the max_concurrency from? as some sort of key?
GitHub
runpod-python/runpod/serverless/core.py at main · runpod/runpod-pyt...
🐍 | Python library for RunPod API and serverless worker SDK. - runpod/runpod-python
justin
justinOP12mo ago
I see you do similar things for return_aggregate_stream, and so on.
Justin Merrell
Justin Merrell12mo ago
Where is this screenshot coming from? And what does your entire handler file look like?
justin
justinOP12mo ago
justin
justinOP12mo ago
And this is the entire handler.py file
justin
justinOP12mo ago
No description
justin
justinOP12mo ago
I basically just took: https://github.com/blib-la/runpod-worker-helloworld And then replaced the rp_handler.py with the code block I posted in the discord. *well im trying max_concurrency now it was : concurrency_modifier
Justin Merrell
Justin Merrell12mo ago
Oh, the code file is diffrent, re-working the runpod SDK now to make sure things are clearer. And change "max_concurrency" to "concurrency_modifier"
justin
justinOP12mo ago
Yeah, it was: concurrency_modifier > but it wasn't working 😅.
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"concurrency_modifier": lambda x: 4,
}
)
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
for i in range(5):
# Generate an asynchronous output token
output = f"Generated async token output {i}; \n {job}"
yield output

# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"concurrency_modifier": lambda x: 4,
}
)
Yeah.. anyways... concurrency_modifier and max_concurrency both don't work it seems now 😔. at least as far i can tell
Justin Merrell
Justin Merrell12mo ago
is it the for loop that is blocking? To be honest I am not a fan of python async haha https://stackoverflow.com/questions/53486744/making-async-for-loops-in-python @PatrickR did we ever release that example to docs?
justin
justinOP12mo ago
HMM. Im not too sure haha. I was just trying to follow runpod's thing on another example for generators in general: https://docs.runpod.io/serverless/workers/handlers/handler-async I guess, I wouldn't expect the for-loop / asyncio.sleep() to block the concurrency_modifier? My thought is that the SDK is probably just calling the method more times and not waiting for the function call to complete, thus why the handler is given the async def I know you said the SDK on github is out of date, but I imagine even with whatever refactoring the logic is the same:
for job in jobs:
asyncio.create_task(_process_job(config, job, serverless_hook), name=job['id'])
await asyncio.sleep(0)
for job in jobs:
asyncio.create_task(_process_job(config, job, serverless_hook), name=job['id'])
await asyncio.sleep(0)
Where it gets a list of jobs _process_job(config, job, serverless_hook), and then it creates a a task asyncio.create_task( per job in the list that _process_job Yeah T-T. Python async really sucks. A huge hate for it, after having to deal with asyncio myself before. Also thought was close enough to the worker-vllm, which you guys had pointed to? https://github.com/runpod-workers/worker-vllm/blob/main/src/handler.py
Justin Merrell
Justin Merrell12mo ago
it might need that async for
justin
justinOP12mo ago
Hm... I'll give that a try Ill just get rid of the for loop IM not sure u can do an async on the for on a range(5) iterator at least the linter is throwing me errors
Justin Merrell
Justin Merrell12mo ago
maybe adding a await async.sleep(10) in there?
justin
justinOP12mo ago
I think needs to be await asyncio.sleep(10), async.sleep(10) just gives me an error: Tried the below still failed:
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)
# Generate an asynchronous output token
output = f"Generated async token output: {job}"
yield output


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"max_concurrency": lambda x: 4,
}
)

# depot build -t justinwlin/runpodhelloworldasync:1.4 . --platform linux/amd64 --push
import asyncio
import runpod


async def async_generator_handler(job):
'''
async generator handler job
'''
# Simulate an asynchronous task, such as processing time for a large language model
await asyncio.sleep(10)
# Generate an asynchronous output token
output = f"Generated async token output: {job}"
yield output


# Configure and start the RunPod serverless function
runpod.serverless.start(
{
"handler": async_generator_handler,
"return_aggregate_stream": True,
"max_concurrency": lambda x: 4,
}
)

# depot build -t justinwlin/runpodhelloworldasync:1.4 . --platform linux/amd64 --push
PatrickR
PatrickR12mo ago
https://github.com/runpod/docs/pull/24 I have a pull request for review.
justin
justinOP12mo ago
@PatrickR / @Justin Merrell Does it need to be a lambda x: 4? Or does it need to be a integer seeing the documentation that patrickR just posted? Oh wait it prob does need to be a lambda i see that the adjust_concurrency, is a function taking in the current_concurrency FYI, if you could add some info on what: current_concurrency is in the docs be great. Cause the way it is right now, all I assume is its prob the number of jobs in the queue? Or it the default current_concurrency, which is 1 - which actually I think is the latter now that I read through what the code is doing. Also the update_request_rate I don't know if this is best in the documentation, b/c this isn't something that the client is able to access as far as I know normally? Like I can't tell the rate in which jobs are entering the queue, so the example right now where that is randomized, doesn't really give me any thing corresponding if I was the client i feel, and I'd just be confused if anyone else was reading the document and be wondering, oh, how do I modify the concurrency rate based off the number of jobs in the queue? (which is why I feel the documentation a bit misleading)
justin
justinOP12mo ago
FYI, I tried with a modified function in the docs + also just tried to follow the docs exactly just with a high_request_rate_threshold = 1 and I got an error both times.
{
"delayTime": 22778,
"error": "{\"error_type\": \"<class 'KeyError'>\", \"error_message\": \"'data'\", \"error_traceback\": \"Traceback (most recent call last):\\n File \\\"/usr/local/lib/python3.10/site-packages/runpod/serverless/modules/rp_job.py\\\", line 134, in run_job\\n job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return\\n File \\\"/rp_handler.py\\\", line 10, in process_request\\n return f\\\"Processed: {job['data']}\\\"\\nKeyError: 'data'\\n\", \"hostname\": \"fnyqdflnz8kizc-64411280\", \"worker_id\": \"fnyqdflnz8kizc\", \"runpod_version\": \"1.3.7\"}",
"executionTime": 2168,
"id": "50089ca9-d87a-4266-9f98-b7f68e39f7e2-u1",
"status": "FAILED"
}
{
"delayTime": 22778,
"error": "{\"error_type\": \"<class 'KeyError'>\", \"error_message\": \"'data'\", \"error_traceback\": \"Traceback (most recent call last):\\n File \\\"/usr/local/lib/python3.10/site-packages/runpod/serverless/modules/rp_job.py\\\", line 134, in run_job\\n job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return\\n File \\\"/rp_handler.py\\\", line 10, in process_request\\n return f\\\"Processed: {job['data']}\\\"\\nKeyError: 'data'\\n\", \"hostname\": \"fnyqdflnz8kizc-64411280\", \"worker_id\": \"fnyqdflnz8kizc\", \"runpod_version\": \"1.3.7\"}",
"executionTime": 2168,
"id": "50089ca9-d87a-4266-9f98-b7f68e39f7e2-u1",
"status": "FAILED"
}
justin
justinOP12mo ago
ANyways Ill leave this for now 😅 I guess this just isn't working right now
PatrickR
PatrickR12mo ago
@justin The function adjust_concurrency should be written by you and for your use case. The documentation is showing what you could use and make. But there is no way the example can accurately handle each user's use case. Also, this was added in 1.4, so make sure your Python SDK is at least on that version.
"runpod_version": "1.3.7"}
Change log: https://github.com/runpod/runpod-python/blob/bd68176db7feb6059a3d73b41f6ec6d0d83e0f15/CHANGELOG.md?plain=1#L82C12-L82C17 You can check your SDK version by running:
import runpod

version = runpod.version.get_version()

print(f"RunPod version number: {version}")
import runpod

version = runpod.version.get_version()

print(f"RunPod version number: {version}")
justin
justinOP12mo ago
Got it~ will give it a try! Thank you! #THANKS to @PatrickR and @Justin Merrell The documentation examples work 🙂
Solution
justin
justin12mo ago
import runpod
import asyncio
import random


async def process_request(job):
await asyncio.sleep(10) # Simulate processing time
return f"Processed: {job['data']}"

def adjust_concurrency(current_concurrency):
"""
Adjusts the concurrency level based on the current request rate.
"""
return 5

# Start the serverless function with the handler and concurrency modifier
runpod.serverless.start({
"handler": process_request,
"concurrency_modifier": adjust_concurrency
})
import runpod
import asyncio
import random


async def process_request(job):
await asyncio.sleep(10) # Simulate processing time
return f"Processed: {job['data']}"

def adjust_concurrency(current_concurrency):
"""
Adjusts the concurrency level based on the current request rate.
"""
return 5

# Start the serverless function with the handler and concurrency modifier
runpod.serverless.start({
"handler": process_request,
"concurrency_modifier": adjust_concurrency
})
justin
justinOP12mo ago
😄
justin
justinOP12mo ago
No description
justin
justinOP12mo ago
5 jobs instantly picke dup oh lol but i still got an error.. hmm, ill keep playing with it xDDD but interesting
{
"delayTime": 14453,
"error": "{\"error_type\": \"<class 'KeyError'>\", \"error_message\": \"'data'\", \"error_traceback\": \"Traceback (most recent call last):\\n File \\\"/usr/local/lib/python3.10/site-packages/runpod/serverless/modules/rp_job.py\\\", line 135, in run_job\\n job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return\\n File \\\"/rp_handler.py\\\", line 8, in process_request\\n return f\\\"Processed: {job['data']}\\\"\\nKeyError: 'data'\\n\", \"hostname\": \"e0adia1urlntl9-64410cdc\", \"worker_id\": \"e0adia1urlntl9\", \"runpod_version\": \"1.6.0\"}",
"executionTime": 10150,
"id": "b32f3b62-2129-4044-9f8c-b32baf33ea33-u1",
"status": "FAILED"
}
{
"delayTime": 14453,
"error": "{\"error_type\": \"<class 'KeyError'>\", \"error_message\": \"'data'\", \"error_traceback\": \"Traceback (most recent call last):\\n File \\\"/usr/local/lib/python3.10/site-packages/runpod/serverless/modules/rp_job.py\\\", line 135, in run_job\\n job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return\\n File \\\"/rp_handler.py\\\", line 8, in process_request\\n return f\\\"Processed: {job['data']}\\\"\\nKeyError: 'data'\\n\", \"hostname\": \"e0adia1urlntl9-64410cdc\", \"worker_id\": \"e0adia1urlntl9\", \"runpod_version\": \"1.6.0\"}",
"executionTime": 10150,
"id": "b32f3b62-2129-4044-9f8c-b32baf33ea33-u1",
"status": "FAILED"
}
Lol so close, ill keep messing with it but at least it threw 5 errors in parallel xD Oh prob cause i dont have data got it actually ['data']
Justin Merrell
Justin Merrell12mo ago
Change it to input, you were on prototype docs lol
justin
justinOP12mo ago
hehe thank u thoooo 😄
PatrickR
PatrickR12mo ago
Yes. Just fixed docs to use the right key.
justin
justinOP12mo ago
Yay Im so happy
PatrickR
PatrickR12mo ago
Thanks for playing along! Super great to see it helping.
justin
justinOP12mo ago
Yeah, im trying to work on a busy box for runpod with like an easy base template for open ssh / jupyter notebook / all that, which I can just call and setup easily for all my future docker images so this is super helpful~ cause i wanted to add a concurrency example + also my current projects I wanted to add some concurrency stuff so im not just wasting gpu power Thank you guys ;D

Did you find this page helpful?