How can I make a single worker handle multiple requests concurrently before starting the next worker

Hi everyone, I’ve deployed an image generation model using a 24GB GPU with 2 workers (1 active) on RunPod. Each image generation uses around 6-7GB of memory. My goal is to have a single worker handle multiple requests concurrently until it can’t handle the load anymore, and only then should the second worker start. Right now, when I send two requests, the second worker starts immediately to process the second request, even though my first worker should have enough resources left to handle both requests at once. How can I make sure that a single worker handles multiple requests concurrently before the next worker is activated? I am new to cloud deployments so If I get any terminology wrong, please guide me. Thanks in advance!
19 Replies
nerdylive
nerdylive2mo ago
hmm, to do so i guess the most effecient and easy way is to find, how much concurrent request ( or max request load ) per worker is.. then set your serverless endpoint scaling type to request count, and set the number to that amount you found, but i suggest don't max out on one worker, leave out some space so theres no OOM vram error and then, use concurrent handler, and set the amount to that amount you found too ( look docs for more details about concurrent handler )
Ammar Ahmed
Ammar AhmedOP2mo ago
Yeah thanks i did it using concurrent handler. Another thing is we developing this as a service for our app. So we have premium and regular customers who will be accessing this server for image generation. I checked the docs but I couldn't find anything related to modifying runpod's queue management. Is there any way I can do so to give premium customers a higher priority?
nerdylive
nerdylive2mo ago
just create a new endpoint with the same template modify the scaling type, scaling modifier (amount of seconds), the concurrent amount hows that way
Ammar Ahmed
Ammar AhmedOP2mo ago
Got your point but I'm trying to manage request prioritization between premium and regular customers using the same server. What I need is a way to ensure that premium customers' requests get processed with higher priority over regular ones, without spinning up new instances or separate endpoints. I couldn't find any documentation in RunPod about queue management to allow this kind of prioritization. Essentially, I want to modify how the queue behaves so that premium requests jump ahead of regular ones in the processing order. Can I modify the queue behavior or set some priority rules for incoming requests in RunPod?
nerdylive
nerdylive2mo ago
Yea look at the docs I think there is something about priority You mean you manage it yourself inside a worker? or pods? or the same endpoint?
nerdylive
nerdylive2mo ago
No description
nerdylive
nerdylive2mo ago
i think thats what you're looking for isn't it? If there is no feature here yet, describe an example case and request a feature in #🧐|feedback for now you can create your own solution if its not that, i guess...
Ammar Ahmed
Ammar AhmedOP2mo ago
yes i found it on docs and was figuring out how to implement it in python. Will it go with input in handler.py? yes
nerdylive
nerdylive2mo ago
Noo, if you want to implement it you modify the request not the handler The request like the json text inside this screenshot
Ammar Ahmed
Ammar AhmedOP2mo ago
ohh okay. Thanks
nerdylive
nerdylive2mo ago
Yep, your welcome
Ammar Ahmed
Ammar AhmedOP2mo ago
Setup: 2x48GB Pro Instances running on concurrent config. Problem: I am getting variation in processing time, I have to minimize it for the app but as you can see in the screenshot the time is too much whereas on my local machine it takes 15-20 seconds to generate image using this model
No description
nerdylive
nerdylive2mo ago
I'd suggest timing it to see what's making the difference Between each process
Ammar Ahmed
Ammar AhmedOP2mo ago
okay It's taking time to load the model into memory It seems like concurrent request are taking too long to get processed together.
nerdylive
nerdylive2mo ago
Is it not normal? That's the longest time ye? There should be flashboot helping with warmed up workers next time you run a request through a worker I think
Ammar Ahmed
Ammar AhmedOP2mo ago
yes i have flashboot enabled also processing on a single request is fast, but when multiple requests are being processed concurrently, processing is very slow Fixed it, I created a model pool which will keep number of models loaded according to the max concurrency. It reduced the time to below 10 seconds 😀
nerdylive
nerdylive2mo ago
What do you mean, how does it look like
Ammar Ahmed
Ammar AhmedOP2mo ago
class ModelPool:
def __init__(self, max_models=3):
self.lock = Lock()
self.model_queue = queue.Queue()
self.max_models = max_models

# Initialize the pool with a set number of models
for _ in range(max_models):
self.model_queue.put(self._create_model())

def _create_model(self):
""" Load and return a new instance of the model. """
model_id = "SG161222/Realistic_Vision_V2.0"
scheduler = DPMSolverMultistepScheduler.from_pretrained(model_id, subfolder="scheduler")
pipeline = DiffusionPipeline.from_pretrained(model_id, scheduler=scheduler, torch_dtype=torch.float16, cache_dir="model_cache")
pipeline = pipeline.to("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
return pipeline

def get_model(self):
""" Get a model from the pool. """
with self.lock:
return self.model_queue.get()

def return_model(self, model):
""" Return a model to the pool. """
with self.lock:
self.model_queue.put(model)

model_pool = ModelPool(max_models=10)
class ModelPool:
def __init__(self, max_models=3):
self.lock = Lock()
self.model_queue = queue.Queue()
self.max_models = max_models

# Initialize the pool with a set number of models
for _ in range(max_models):
self.model_queue.put(self._create_model())

def _create_model(self):
""" Load and return a new instance of the model. """
model_id = "SG161222/Realistic_Vision_V2.0"
scheduler = DPMSolverMultistepScheduler.from_pretrained(model_id, subfolder="scheduler")
pipeline = DiffusionPipeline.from_pretrained(model_id, scheduler=scheduler, torch_dtype=torch.float16, cache_dir="model_cache")
pipeline = pipeline.to("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
return pipeline

def get_model(self):
""" Get a model from the pool. """
with self.lock:
return self.model_queue.get()

def return_model(self, model):
""" Return a model to the pool. """
with self.lock:
self.model_queue.put(model)

model_pool = ModelPool(max_models=10)
This is the pool, which loaded into memory. Everytime a request is on the server it gets model from this pool.
nerdylive
nerdylive2mo ago
Ooh cool
Want results from more Discord servers?
Add your server