R
RunPod7mo ago
ribbit

How do I handle both streaming and non-streaming request in a serverless pod?

How can I handle both effectively? Is it okay to have a handler witht both yield and return? i.e.
def handler(endpoint):
if endpoint == "stream_response":
yield stream_response()
elif endpoint == "get_response":
return get_response()

runpod.serverless.start(
{
"handler": handler,
"return_aggregate_stream": True
}
)
def handler(endpoint):
if endpoint == "stream_response":
yield stream_response()
elif endpoint == "get_response":
return get_response()

runpod.serverless.start(
{
"handler": handler,
"return_aggregate_stream": True
}
)
Will this work?
39 Replies
agentpietrucha
agentpietrucha7mo ago
Seems it should work. Have you tried testing it locally? Runpod works really good locally, so it's easy to test
justin
justin7mo ago
Eh. runpod local stuff is pretty limited imo. It works for basic use case validation, but i tend to just run it against a GPU Pod instead since it seems to actually work by just directly calling the function. I just skip the runpod.start({{}}) when im on a GPU Pod. In terms of yield and return you should be able to, but I think using yield syntax both ways do not matter. https://github.com/justinwlin/Runpod-OpenLLM-Pod-and-Serverless/blob/main/handler.py
GitHub
Runpod-OpenLLM-Pod-and-Serverless/handler.py at main · justinwlin/R...
A repo for OpenLLM to run pod. Contribute to justinwlin/Runpod-OpenLLM-Pod-and-Serverless development by creating an account on GitHub.
agentpietrucha
agentpietrucha7mo ago
Nice way to run the same handler in both pod and serverless!
justin
justin7mo ago
Thx! Yeah, I love being able to debug on GPU Pod, makes it much easier.
ribbit
ribbitOP7mo ago
no unfortunately it's not easy to test locally, but I tried deploying it anyway and turns out whenever there's yield in the handler function everything I return becomes a generator? I can't get it to work yet
nerdylive
nerdylive7mo ago
yeah it should be right? if you return using yield the datatype would be stream-like so its a streamed response if endpoint == "stream_response": # i think this isnt how you retrieve inputs? yield stream_response() elif endpoint == "get_response": # i think this isnt how you retrieve inputs? and this too.
justin
justin7mo ago
https://discord.com/channels/912829806415085598/948767517332107274/1235352455995199500 Maybe check if your updated runpod version? lol just random guess
nerdylive
nerdylive7mo ago
leme check docs first
justin
justin7mo ago
but my code that i link, works with both streaming / nonstreaming i feel the docs don't show streaming surprisingly 👁️, at least on the client side that there is an endpoint you gotta hit called /stream
nerdylive
nerdylive7mo ago
{ "id": "A_RANDOM_JOB_IDENTIFIER", "input": { "key": "value" } # this is retrieved } your_handler.py
import runpod # Required.


def handler(job):
job_input = job["input"] # Access the input from the request.
# Add your custom code here.
return "Your job results"
import runpod # Required.


def handler(job):
job_input = job["input"] # Access the input from the request.
# Add your custom code here.
return "Your job results"
thats how you retrieve inputs there is, only the endpoint i came looking for that before
justin
justin7mo ago
oh interesting maybe im just bad at looking
justin
justin7mo ago
ah found it
justin
justin7mo ago
Endpoint operations | RunPod Documentation
Comprehensive guide on interacting with models using RunPod's API Endpoints without managing the pods yourself.
nerdylive
nerdylive7mo ago
yep yep just gotta be patient haha
justin
justin7mo ago
yeaaaaa, not the best doc xD
nerdylive
nerdylive7mo ago
true i find laravel docs way more easier to understand
justin
justin7mo ago
A bit more involved imo is the only problem with the doc:
# Stream output
status_url = f"https://api.runpod.ai/v2/{endpoint_id}/stream/{task_id}"

try:
while True: # Adjust the range or use a while loop for continuous polling
time.sleep(1) # Polling interval
get_status = requests.get(status_url, headers=headers)
if get_status.status_code == 200:
status_response = get_status.json()
if 'stream' in status_response and len(status_response['stream']) > 0:
for item in status_response['stream']:
print(item['output']['text'], end='') # Adjust based on the actual structure
if status_response.get('status') == 'COMPLETED':
print("\nJob completed.")
break
else:
print(f"Error streaming job output: {get_status.text}")
break
# Stream output
status_url = f"https://api.runpod.ai/v2/{endpoint_id}/stream/{task_id}"

try:
while True: # Adjust the range or use a while loop for continuous polling
time.sleep(1) # Polling interval
get_status = requests.get(status_url, headers=headers)
if get_status.status_code == 200:
status_response = get_status.json()
if 'stream' in status_response and len(status_response['stream']) > 0:
for item in status_response['stream']:
print(item['output']['text'], end='') # Adjust based on the actual structure
if status_response.get('status') == 'COMPLETED':
print("\nJob completed.")
break
else:
print(f"Error streaming job output: {get_status.text}")
break
ribbit
ribbitOP7mo ago
sorry, I mean whenever a yield is present in the handler, the output becomes a generator regardless if I use yield or return. for example
def handler():
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
def handler():
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
nerdylive
nerdylive7mo ago
when something is false it also returns generator? which endpoint are you hitting
ribbit
ribbitOP7mo ago
yeah that's not how, i just wrote the example that way for convinience
nerdylive
nerdylive7mo ago
have you tried the stream endpoint or the run and runsync? whats the output like
ribbit
ribbitOP7mo ago
i hit the /run endpoint first, then retrieve the stream by hitting /stream sorry i can't produce screenshot right now, but in the local testing library, the output is somewhat like this:
...runpod logs...
output: <generator object ...>
...runpod logs...
...runpod logs...
output: <generator object ...>
...runpod logs...
assume that this is my code
def handler():
value = 1
something = False
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
def handler():
value = 1
something = False
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
it is expected if something is True, but somehow when something is False, it returns a generator as well
nerdylive
nerdylive7mo ago
use run instead hm where is that from: "output: <generator object ...>"
ribbit
ribbitOP7mo ago
haha sorry was dizzy, i meant /run that's from the runpod local dev server log, the one that shows the output of the handler function
nerdylive
nerdylive7mo ago
whats your code just send it here i wanna see how do you print it
ribbit
ribbitOP7mo ago
STREAM_ENDPOINTS = {"stream_answer"}

def handler(job):
validated_input = validate_input(job)
if "errors" in validated_input:
return {
"error": "\n".join(validated_input["errors"])
}

validated_api = validate_api(job)
if "errors" in validated_api:
return {
"error": "\n".join(validated_api["errors"])
}

endpoint, validated_payload = validate_payload(job)
if "errors" in validated_payload:
return {
"error": "\n".join(validated_payload["errors"])
}

if "validated_input" in validated_payload:
payload = validated_payload["validated_input"]
else:
payload = validated_payload

if payload.get("emu_error", False):
return {
"error": "Error emulated. If this is unintended, please check your environment variables"
}

LOGGER.info(f"[INVOKING] {endpoint}", job["id"])
LOGGER.info(f"[SUCCESS] Returning")

response = invoke(endpoint, payload)

if endpoint in STREAM_ENDPOINTS:
for token in response:
yield token
else:
return response
STREAM_ENDPOINTS = {"stream_answer"}

def handler(job):
validated_input = validate_input(job)
if "errors" in validated_input:
return {
"error": "\n".join(validated_input["errors"])
}

validated_api = validate_api(job)
if "errors" in validated_api:
return {
"error": "\n".join(validated_api["errors"])
}

endpoint, validated_payload = validate_payload(job)
if "errors" in validated_payload:
return {
"error": "\n".join(validated_payload["errors"])
}

if "validated_input" in validated_payload:
payload = validated_payload["validated_input"]
else:
payload = validated_payload

if payload.get("emu_error", False):
return {
"error": "Error emulated. If this is unintended, please check your environment variables"
}

LOGGER.info(f"[INVOKING] {endpoint}", job["id"])
LOGGER.info(f"[SUCCESS] Returning")

response = invoke(endpoint, payload)

if endpoint in STREAM_ENDPOINTS:
for token in response:
yield token
else:
return response
output
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object handler at 0x7d0f40748890>
DEBUG | local_test | run_job return: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Local testing complete, exiting.
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object handler at 0x7d0f40748890>
DEBUG | local_test | run_job return: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Local testing complete, exiting.
the endpoint's value is not stream_answer, yet it always returns a generator
nerdylive
nerdylive7mo ago
Whats is invoke(endpoint, payload)? what is endpoint too? maybe some of that returns a generator already
ribbit
ribbitOP7mo ago
def invoke(
endpoint: str,
payload: dict
) -> Any:
if endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "answer_stream":
response = stream_func() # this returns a generator

return response
def invoke(
endpoint: str,
payload: dict
) -> Any:
if endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "answer_stream":
response = stream_func() # this returns a generator

return response
this is how the invoke method is, i omitted irrelevant codes but that's generally how the function is.
endpoint, validated_payload = validate_payload(job)
endpoint, validated_payload = validate_payload(job)
I got endpoint from this line in the handler function, which will pass job into a validator function that simply check if my payload schema is ok. basically the inputted payload should look like this:
{
"input": {
"api": {
"endpoint": "...",
},
...rest of the json
}
}
{
"input": {
"api": {
"endpoint": "...",
},
...rest of the json
}
}
basically that function validates and extract the endpoint variable from my input payload i am very sure that this is not the case, tried and checked every other function, this stream function is a new addition to the code and before that (all the function above of the stream_func is the original code) no function ever coded to return a generator
justin
justin7mo ago
why does it matter if its a generator or not? If there is nothing else being sent, then the reponse is just done anyways Whether you use yield or return
nerdylive
nerdylive7mo ago
yeah true you should debug, where the response gets to type "generator" like logging from the returns, the variables
ribbit
ribbitOP7mo ago
because the change of the response type would disrupt other running services, i am to avoid that
def dummy(job):
something = False
if something:
yield 1
else:
return 1


if __name__ == "__main__":
runpod.serverless.start(
{
"handler": dummy,
"return_aggregate_stream": True
}
)
def dummy(job):
something = False
if something:
yield 1
else:
return 1


if __name__ == "__main__":
runpod.serverless.start(
{
"handler": dummy,
"return_aggregate_stream": True
}
)
this code also returns a generator
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object dummy at 0x7376fcbdedc0>
DEBUG | local_test | run_job return: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Local testing complete, exiting.
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object dummy at 0x7376fcbdedc0>
DEBUG | local_test | run_job return: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Local testing complete, exiting.
I think it's just the way it is thank you all
nerdylive
nerdylive7mo ago
alright
justin
justin7mo ago
i think this a problem with local testing it shouldnt return a generator should just be 1
ribbit
ribbitOP7mo ago
{
"delayTime": 9289,
"executionTime": 54,
"id": "bc6aaadd-0970-4100-99a0-f4230d18be4f-e1",
"output": [],
"status": "COMPLETED"
}
{
"delayTime": 9289,
"executionTime": 54,
"id": "bc6aaadd-0970-4100-99a0-f4230d18be4f-e1",
"output": [],
"status": "COMPLETED"
}
returned a generator still, same code ran on serverless. no log to prove that it's a generator tho but it has the same behavior as before when it casted all my outputs to be a generator, it returns a [] and empty stream when streamed.
nerdylive
nerdylive7mo ago
Does the normal streaming works? You might need to contact runpod from web or email for this problem
ribbit
ribbitOP7mo ago
the normal streaming works well, now we have 2 separate endpoints one to handle all non-streaming and one to handle all the streaming hahah will try to do so
nerdylive
nerdylive7mo ago
yeah nice fix for this
Want results from more Discord servers?
Add your server