R
RunPodβ€’10mo ago
bggai

Worker hangs for really long time, performance is not close to what it should be

Hi, I'm working with a transcription and diarization endpoint. The docker image works great, tested locally and also inside a worker, I ssh into the worker and tested using:
python handler.py --test_input '{"input": {"endpoint": "transcribe_option", "file_path": "dev/tmp/test_files/FastAPI_Introduction_-_Build_Your_First_Web_App_-_Python_Tutorial.mp4", "is_diarization": true}}'
python handler.py --test_input '{"input": {"endpoint": "transcribe_option", "file_path": "dev/tmp/test_files/FastAPI_Introduction_-_Build_Your_First_Web_App_-_Python_Tutorial.mp4", "is_diarization": true}}'
The processing time is around 1 minute for this video (11 min), works great, these are the logs I get from running inside the worker the same reques -> Message.txt appended. Once I test this endpoint using a normal request the worker behaves completely abnormal, taking more than 5-6 minutes just to start the transcription, then even more minutes transcribing. The really weird part is I tested the handler in the worker itself using ssh, I have no idea how to debug this or what might be happening:
2024-02-04T03:05:20.295007097Z --- Starting Serverless Worker | Version 1.6.0 ---
2024-02-04T03:10:30.435312352Z {"requestId": "c81dbe2d-2000-47a8-9336-3c056b9576ca-u1", "message": "Started.", "level": "INFO"}
2024-02-04T03:10:30.596425577Z credentials.py :1123 2024-02-04 03:10:30,596 Found credentials in environment variables.
2024-02-04T03:11:12.226402320Z ic| type(audio_file): <class '_io.BytesIO'>
2024-02-04T03:17:26.405880274Z transcribe.py :263 2024-02-04 03:17:26,405 Processing audio with duration 11:56.266
2024-02-04T03:21:45.191995763Z transcribe.py :317 2024-02-04 03:21:45,191 Detected language 'en' with probability 1.00
2024-02-04T03:05:20.295007097Z --- Starting Serverless Worker | Version 1.6.0 ---
2024-02-04T03:10:30.435312352Z {"requestId": "c81dbe2d-2000-47a8-9336-3c056b9576ca-u1", "message": "Started.", "level": "INFO"}
2024-02-04T03:10:30.596425577Z credentials.py :1123 2024-02-04 03:10:30,596 Found credentials in environment variables.
2024-02-04T03:11:12.226402320Z ic| type(audio_file): <class '_io.BytesIO'>
2024-02-04T03:17:26.405880274Z transcribe.py :263 2024-02-04 03:17:26,405 Processing audio with duration 11:56.266
2024-02-04T03:21:45.191995763Z transcribe.py :317 2024-02-04 03:21:45,191 Detected language 'en' with probability 1.00
This should happen in matter of seconds, just liek the logs from the execution within the worker says: Manual execution
--- Starting Serverless Worker | Version 1.6.0 ---
{"requestId": null, "message": "test_input set, using test_input as job input.", "level": "INFO"}
{"requestId": "local_test", "message": "Started.", "level": "INFO"}
credentials.py :1123 2024-02-04 03:15:50,178 Found credentials in environment variables.
ic| type(audio_file): <class '_io.BytesIO'>
transcribe.py :263 2024-02-04 03:15:58,890 Processing audio with duration 11:56.266
transcribe.py :317 2024-02-04 03:16:00,622 Detected language 'en' with probability 1.00
--- Starting Serverless Worker | Version 1.6.0 ---
{"requestId": null, "message": "test_input set, using test_input as job input.", "level": "INFO"}
{"requestId": "local_test", "message": "Started.", "level": "INFO"}
credentials.py :1123 2024-02-04 03:15:50,178 Found credentials in environment variables.
ic| type(audio_file): <class '_io.BytesIO'>
transcribe.py :263 2024-02-04 03:15:58,890 Processing audio with duration 11:56.266
transcribe.py :317 2024-02-04 03:16:00,622 Detected language 'en' with probability 1.00
Weirdly enough, I have another endpoint using just transcription, its also fast:
2024-02-04T03:28:55.812030213Z ic| 'Initializing transcribe with files'
2024-02-04T03:28:55.865688543Z credentials.py :1123 2024-02-04 03:28:55,865 Found credentials in environment variables.
2024-02-04T03:29:00.602901119Z ic| 'transcribe_wfiles'
2024-02-04T03:29:00.612098062Z ic| type(audio_file): <class '_io.BytesIO'>
2024-02-04T03:29:33.423009670Z transcribe.py :263 2024-02-04 03:29:33,422 Processing audio with duration 11:56.266
2024-02-04T03:30:02.189243458Z transcribe.py :317 2024-02-04 03:30:02,188 Detected language 'en' with probability 1.00
2024-02-04T03:28:55.812030213Z ic| 'Initializing transcribe with files'
2024-02-04T03:28:55.865688543Z credentials.py :1123 2024-02-04 03:28:55,865 Found credentials in environment variables.
2024-02-04T03:29:00.602901119Z ic| 'transcribe_wfiles'
2024-02-04T03:29:00.612098062Z ic| type(audio_file): <class '_io.BytesIO'>
2024-02-04T03:29:33.423009670Z transcribe.py :263 2024-02-04 03:29:33,422 Processing audio with duration 11:56.266
2024-02-04T03:30:02.189243458Z transcribe.py :317 2024-02-04 03:30:02,188 Detected language 'en' with probability 1.00
No description
Solution:
Take a look at our implementation of Fast Whisper https://github.com/runpod-workers/worker-faster_whisper/blob/main/src/rp_handler.py Your code is already blocking, async is likely just introducing complexities...
Jump to solution
45 Replies
bggai
bggaiOPβ€’10mo ago
@Madiator2011 here
Madiator2011
Madiator2011β€’10mo ago
Probably it's downloading input file though it's look like you use cpu and not gpu Is that custom code?
bggai
bggaiOPβ€’10mo ago
yes is a custom code, but I have every model in cache if i log in to the worker with ssh the coderuns smoothly in just 1-2 min for a 11 minuyte video and even 6-8 minues for a 2.5 hour video but once the worker is actually called it hebhaves completely different
Madiator2011
Madiator2011β€’10mo ago
you use local file uploaded to worker or download input file from remote location
bggai
bggaiOPβ€’10mo ago
the file is in AWS S3, it does take more time, but that is not the problem, you can see it actually prints the type of video and everything but once it starts transcription or diarization it hangs
bggai
bggaiOPβ€’10mo ago
performance of just transcription is close now
No description
bggai
bggaiOPβ€’10mo ago
but once the file is too big it takes a lot of time why is it that this does not take that much time when I manually execute the handler using ssh into the worker?
Madiator2011
Madiator2011β€’10mo ago
I think it's probably you giving path to local file
bggai
bggaiOPβ€’10mo ago
could you explain better? what I do is: 1. Send S3 path to worker 2. Download the object as bytes, just using memory that way I do not download the file 3. Apply transcription, for small files this handles it well, for bigger files the difference is huge
Madiator2011
Madiator2011β€’10mo ago
Though still you getting latency
bggai
bggaiOPβ€’10mo ago
Shouldnt the performance within ssh be the same then?
Madiator2011
Madiator2011β€’10mo ago
reading file from remote server will be slower than reading local stored file
bggai
bggaiOPβ€’10mo ago
But if you have the local file, you need to loaded into memory either way, I'm doing the same? Are you saying that even if I download the object as bytes, it will be faster to download and then open as bytes? seems weird
Madiator2011
Madiator2011β€’10mo ago
you also adding network latency when reading file
bggai
bggaiOPβ€’10mo ago
I know, but I have been saying the difference is mainly the gpu process, not reading the file,l I know that can take time and it does, but why should this affect the process with GPU once the file is already loaded into python memory
Madiator2011
Madiator2011β€’10mo ago
I do not know code of your worker so cant help
bggai
bggaiOPβ€’10mo ago
Here is my code:
async def async_transcribe_wfiles(
audio_file,
filename: str,
file_path:str,
):
# Create timestamp
ic(type(audio_file))
loop = asyncio.get_event_loop()
# Faster whisper
segments, info = await loop.run_in_executor(None, lambda: whisper_model.transcribe(audio_file, beam_size=5))
segments = await loop.run_in_executor(None, lambda: list(segments))
ic(segments)
# Create directory
#os.makedirs(f'/tmp/{org_id}/{session}', exist_ok=True)
work_dir = f'{file_path.split(f"/{filename}")[0]}'
ic(work_dir)
os.makedirs(work_dir, exist_ok=True)

# Create files to upload
segments_x = [{"text": segment.text, "start": round(segment.start, 3), "end": round(segment.end, 3)} for segment in segments]

json_path = f'{work_dir}/{filename}_{info.language}_segments.json'
# Check if SRT file (subtitles options is being called)
srt_path = f'{work_dir}/{filename}_transcribe.srt'
srt_content = await loop.run_in_executor(None, create_srt, segments_x)
txt_path = f'{work_dir}/{filename}_transcribe.txt'
txt_content = "".join([x['text'] for x in segments_x]) #Devolver esta variable.
# Save to S3 using bytes
await loop.run_in_executor(None, save_string_to_s3, txt_path, txt_content)
await loop.run_in_executor(None, save_string_to_s3, srt_path, srt_content)
await async_write_json(json_path, segments_x)
await async_upload_s3(json_path, settings.AWS_BUCKET_NAME, f'{work_dir}/{filename}_{info.language}_segments.json')
ic(f'{filename}_{info.language}_segments.json')
return info, txt_path, srt_path, json_path

async def transcribe_option(
file_path: str,
is_diarization: bool,
):
# Download from s3
loop = asyncio.get_event_loop()
file_contents = await loop.run_in_executor(None, download_s3_object_to_memory, file_path)
filename, file_extension, clean_filename = sanitize_file_name_s3path(file_path)

# Create transcription
info, txt_path, srt_path, json_path = await async_transcribe_wfiles(file_contents, filename, file_path)

return {
"transcripts": [txt_path, srt_path],
"type": "diarization" if is_diarization else "transcription"
}

### Handler
# Define the handler function.
def handler(job):
job_input = job["input"] # Access the input from the request.
endpoint = job_input["endpoint"]
try:
# Check endpoint and process
if endpoint == "fast_transcribe":
job_input.pop("endpoint")
return transcribe(**job_input)
if endpoint == "transcribe":
job_input.pop("endpoint")
return transcribe_wfiles(**job_input)
if endpoint == "transcribe_option":
job_input.pop("endpoint")
return transcribe_option(**job_input)
if endpoint == "transcribe_diarize":
job_input.pop("endpoint")
return transcribe_diarize(**job_input)
except Exception as e:
return {"error": f"Invalid input data: {str(e)}"}

runpod.serverless.start({"handler": handler})
async def async_transcribe_wfiles(
audio_file,
filename: str,
file_path:str,
):
# Create timestamp
ic(type(audio_file))
loop = asyncio.get_event_loop()
# Faster whisper
segments, info = await loop.run_in_executor(None, lambda: whisper_model.transcribe(audio_file, beam_size=5))
segments = await loop.run_in_executor(None, lambda: list(segments))
ic(segments)
# Create directory
#os.makedirs(f'/tmp/{org_id}/{session}', exist_ok=True)
work_dir = f'{file_path.split(f"/{filename}")[0]}'
ic(work_dir)
os.makedirs(work_dir, exist_ok=True)

# Create files to upload
segments_x = [{"text": segment.text, "start": round(segment.start, 3), "end": round(segment.end, 3)} for segment in segments]

json_path = f'{work_dir}/{filename}_{info.language}_segments.json'
# Check if SRT file (subtitles options is being called)
srt_path = f'{work_dir}/{filename}_transcribe.srt'
srt_content = await loop.run_in_executor(None, create_srt, segments_x)
txt_path = f'{work_dir}/{filename}_transcribe.txt'
txt_content = "".join([x['text'] for x in segments_x]) #Devolver esta variable.
# Save to S3 using bytes
await loop.run_in_executor(None, save_string_to_s3, txt_path, txt_content)
await loop.run_in_executor(None, save_string_to_s3, srt_path, srt_content)
await async_write_json(json_path, segments_x)
await async_upload_s3(json_path, settings.AWS_BUCKET_NAME, f'{work_dir}/{filename}_{info.language}_segments.json')
ic(f'{filename}_{info.language}_segments.json')
return info, txt_path, srt_path, json_path

async def transcribe_option(
file_path: str,
is_diarization: bool,
):
# Download from s3
loop = asyncio.get_event_loop()
file_contents = await loop.run_in_executor(None, download_s3_object_to_memory, file_path)
filename, file_extension, clean_filename = sanitize_file_name_s3path(file_path)

# Create transcription
info, txt_path, srt_path, json_path = await async_transcribe_wfiles(file_contents, filename, file_path)

return {
"transcripts": [txt_path, srt_path],
"type": "diarization" if is_diarization else "transcription"
}

### Handler
# Define the handler function.
def handler(job):
job_input = job["input"] # Access the input from the request.
endpoint = job_input["endpoint"]
try:
# Check endpoint and process
if endpoint == "fast_transcribe":
job_input.pop("endpoint")
return transcribe(**job_input)
if endpoint == "transcribe":
job_input.pop("endpoint")
return transcribe_wfiles(**job_input)
if endpoint == "transcribe_option":
job_input.pop("endpoint")
return transcribe_option(**job_input)
if endpoint == "transcribe_diarize":
job_input.pop("endpoint")
return transcribe_diarize(**job_input)
except Exception as e:
return {"error": f"Invalid input data: {str(e)}"}

runpod.serverless.start({"handler": handler})
file contents is the file in memoery same video, now hangs forever, I ven changed to a gpu 24gb pro
bggai
bggaiOPβ€’10mo ago
more than 10 minutes and still no tanscription, file is already lodad per logs
No description
bggai
bggaiOPβ€’10mo ago
No description
bggai
bggaiOPβ€’10mo ago
the performance is not stable at all, still working at great speed while doing ssh why is the worker working faster in ssh ?? it does not make any sense, it supposed to ahve the same performance in the image I sent. the worker is completely stuck, I just ran the hadnler manually ni shh int he worker it works exceptionally fast, this is what I expect fromn the serverless worker
bggai
bggaiOPβ€’10mo ago
No description
bggai
bggaiOPβ€’10mo ago
worker still frozen ... ssh already finished
No description
No description
bggai
bggaiOPβ€’10mo ago
@justin please anyone that can help, I'm being billed for every second, can anyone frm runpod actually help me please? @Justin Merrell @Polar @Finley tagging admin in hope someone actually helps, @Madiator2011 did not answer any question nor try to help me, please we relly want to scale using runpod, I did not have any problem using constant pods, but serverless has been compeltely unpredictable
Justin Merrell
Justin Merrellβ€’10mo ago
I am trying to catch up on the issue, what is the concern again? Have you tried adding any print statements within your handler to see where it might be getting caught up?
bggai
bggaiOPβ€’10mo ago
yes after the print on the left image ic| type(audio_file): <class '_io.BytesIO'> the transcription "starts", so at leat I should see the message transcribe.py :263 2024-02-04 22:35:25,366 Processing audio with duration 02:36:32.192, as you can see in the successfull run: Run inside the active worker with ssh:
--- Starting Serverless Worker | Version 1.6.0 ---
{"requestId": null, "message": "test_input set, using test_input as job input.", "level": "INFO"}
{"requestId": "local_test", "message": "Started.", "level": "INFO"}
credentials.py :1123 2024-02-04 22:58:15,683 Found credentials in environment variables.
ic| type(audio_file): <class '_io.BytesIO'>
transcribe.py :263 2024-02-04 22:58:26,199 Processing audio with duration 02:36:32.192
transcribe.py :317 2024-02-04 22:58:40,144 Detected language 'es' with probability 0.98
--- Starting Serverless Worker | Version 1.6.0 ---
{"requestId": null, "message": "test_input set, using test_input as job input.", "level": "INFO"}
{"requestId": "local_test", "message": "Started.", "level": "INFO"}
credentials.py :1123 2024-02-04 22:58:15,683 Found credentials in environment variables.
ic| type(audio_file): <class '_io.BytesIO'>
transcribe.py :263 2024-02-04 22:58:26,199 Processing audio with duration 02:36:32.192
transcribe.py :317 2024-02-04 22:58:40,144 Detected language 'es' with probability 0.98
As you can see after between loading the file and start transcription only seconds go by. While in the serverless worker being called, sometimes it just gets stuck before transcription, printing only the file type as you can see in the image on the left Only sometimes the worker actually works as expected but is not stable, and the main questions is why it takes like 3 minutes doing it frm ssh, the worker should work exactly as the ssh no? @Justin Merrell
Justin Merrell
Justin Merrellβ€’10mo ago
Not sure I am following why you are SSHing into your worker
bggai
bggaiOPβ€’10mo ago
To test the hardware, If I ssh basically I'm using the image and also the hardware, If I run python handler_tmp.py --test_input $INPUT_DATA then results should be similar, otherwse why would you offer a ssh command to connect to the active worker? And that is not even the problem @Justin Merrell why on earth this Pro GPU is not working, this funciton was working on a pod with A4000, and using a RTX 4090 gets stuck?? It does not make any sense
bggai
bggaiOPβ€’10mo ago
No description
bggai
bggaiOPβ€’10mo ago
more than 12 minutes and still, the transcription has not started Is like the code is not even being executed properly, I should be able to see at least thje transcription logs, not the final segmentes wich are also a print
Justin Merrell
Justin Merrellβ€’10mo ago
May you paste the job id and endpoint id here? From the screenshots it appears that something is getting caught within the handler functions
bggai
bggaiOPβ€’10mo ago
of course, sending it right away jyylcod6owxt9i this only happens with larger file sit seems, I have more than 40 gb in the container disk so it shuold be enough is there anything I can do to help debug this? @Justin Merrell
Justin Merrell
Justin Merrellβ€’10mo ago
Is it getting caught up trying to do CPU work? I am seeing that one of the jobs failed and was retried
bggai
bggaiOPβ€’10mo ago
It seems that way, but in some executions I was able to see gpu utilziation
Justin Merrell
Justin Merrellβ€’10mo ago
Can you add print statements while your handler code is processing things just to confirm it is still working
bggai
bggaiOPβ€’10mo ago
i just canceled the request that was retrying I have print statements, I'll write each content should appear 1. Init -> ic| device: 'cuda', compute: 'float16' 2. Download file frm AWS S3 in bytes form, at the end we should see: ic| type(audio_file): <class '_io.BytesIO'> 3. Transcription, when it starts we should see:
transcribe.py :263 2024-02-04 22:58:26,199 Processing audio with duration 02:36:32.192
transcribe.py :317 2024-02-04 22:58:40,144 Detected language 'es' with probability 0.98
transcribe.py :263 2024-02-04 22:58:26,199 Processing audio with duration 02:36:32.192
transcribe.py :317 2024-02-04 22:58:40,144 Detected language 'es' with probability 0.98
4. After finishing transcription, segment should print: ic| segments: [Segment(id=1, seek=2428,... 5. After segments a work_dir print should appear: ic| work_dir: 'dev/tmp/test_files' since we are saving the file there 6. A final message after saving the file should appear: ic| f'{filename}_{info.language}_segments.json': 'πŸššπŸŽ“ 1) Environments PEPE-20240131_140530-Meeting Recording_es_segments.json' I'm using icecream printing library that is the ic I cannot put more prints, as ther is no middle step, these prints were designed to debug the system but are no quite helpful now in the worker @Justin Merrell
Justin Merrell
Justin Merrellβ€’10mo ago
Which step is it getting hung up on?
bggai
bggaiOPβ€’10mo ago
number 3, transcription never seems to start this image @Justin Merrell
Justin Merrell
Justin Merrellβ€’10mo ago
And what code in the handler should be running that?
Madiator2011
Madiator2011β€’10mo ago
I’m suspecting that the code might not recognize stream file as correct file though won’t will be able to check it to morning
bggai
bggaiOPβ€’10mo ago
this is the code:
async def async_transcribe_wfiles(
audio_file,
filename: str,
file_path:str,
):
# Create timestamp
ic(type(audio_file))
loop = asyncio.get_event_loop()

segments, info = await loop.run_in_executor(None, lambda: whisper_model.transcribe(audio_file, beam_size=5))
segments = await loop.run_in_executor(None, lambda: list(segments))
ic(segments)
# Create directory
#os.makedirs(f'/tmp/{org_id}/{session}', exist_ok=True)
work_dir = f'{file_path.split(f"/{filename}")[0]}'
ic(work_dir)
os.makedirs(work_dir, exist_ok=True)
async def async_transcribe_wfiles(
audio_file,
filename: str,
file_path:str,
):
# Create timestamp
ic(type(audio_file))
loop = asyncio.get_event_loop()

segments, info = await loop.run_in_executor(None, lambda: whisper_model.transcribe(audio_file, beam_size=5))
segments = await loop.run_in_executor(None, lambda: list(segments))
ic(segments)
# Create directory
#os.makedirs(f'/tmp/{org_id}/{session}', exist_ok=True)
work_dir = f'{file_path.split(f"/{filename}")[0]}'
ic(work_dir)
os.makedirs(work_dir, exist_ok=True)
an async version of running the faster wshiper medium model, previosly cached in the dockerfile @Justin Merrell
Justin Merrell
Justin Merrellβ€’10mo ago
why async?
bggai
bggaiOPβ€’10mo ago
it was previously handled like that and after we upload using async as well are you saying I shoudl try 100% sync and maybe i'll work? shouldnt we see an error from asyncio?
Solution
Justin Merrell
Justin Merrellβ€’10mo ago
Take a look at our implementation of Fast Whisper https://github.com/runpod-workers/worker-faster_whisper/blob/main/src/rp_handler.py Your code is already blocking, async is likely just introducing complexities
bggai
bggaiOPβ€’10mo ago
I'll try sync mode, will be posting updates here 4 endpoints were corrected with using sync methods, I had to sacrifice 1 functino that combined async logics and could not be replaced as easily, now the testing makes sense with the resutls obtained by ssh, still thinking why it didnt work with async, eveything was going smooth in a normal gpu with that code @Justin Merrell thanks a lot for the help and listening @Madiator2011 hope you can work con support skills, basically the first 50 messages were lost and the problem was visible, you just did not ask any helpful question but just asume, hope you can work on that either thanks a lot, I'll be testing async logics next week, If i manage to mix it correctly I'll be posting results here so other people can benefit, thanks
Madiator2011
Madiator2011β€’10mo ago
I'm sorry that I could not help with that but you tagged me in the late night πŸ™‚
Want results from more Discord servers?
Add your server