T
Twenty8mo ago
rush

need help with creating message-queue tasks.

Hello, it is an awesome product and I want to start by thanking you guys for making it opensource. I am trying to add a feature and I've built it raw, without integrating message-queue functionality into it. A little description- my feature uses langchain; it takes the required data and enriches it with more information (through an LLM). Now since I am sendiing many records, and aim to use it for more than 1 users at a time, I want to integrate it with message queues. The documentation doesn't seem to be helping me. I am a bit new to the development space also. I'll be grateful if you point me in the right direction. I have tried to look through and understand how the gmail and calendar sync is happening but doesn't seem to help by developing-with-reference technique for my use-case. So, this is the controller that is taking the arguments and working to enrich the data:
@Controller('gpt-api')
export class GPTAPIController {
constructor(
private gptAPIService: GPTAPIService,
) {}

@Get()
findAll(): string {
return 'This action returns all cats';
}

@Post('enrichment')
get(@Req() req): object {
return this.gptAPIService.enrichData(req.body.options, req.body.rawData);
}

@Post('process-cv')
async getCVProcess(): Promise<object> {
const cvProcessingService = new CVProcessing(
'/path/to/file.pdf',
);

return await cvProcessingService.getScoresFromCustomPrompt(
questions,
// req.body.field_names,
);
}

@Post('putting')
put() {}
}
@Controller('gpt-api')
export class GPTAPIController {
constructor(
private gptAPIService: GPTAPIService,
) {}

@Get()
findAll(): string {
return 'This action returns all cats';
}

@Post('enrichment')
get(@Req() req): object {
return this.gptAPIService.enrichData(req.body.options, req.body.rawData);
}

@Post('process-cv')
async getCVProcess(): Promise<object> {
const cvProcessingService = new CVProcessing(
'/path/to/file.pdf',
);

return await cvProcessingService.getScoresFromCustomPrompt(
questions,
// req.body.field_names,
);
}

@Post('putting')
put() {}
}
I want to integrate message queues such that when I send multiple CVs (or through multiple users at once) it adds to the message queue and then executes them one at a time. I am using Bull-MQ and Redis. I can provide more context if you want. I would appreciate any help.
9 Replies
charles
charles8mo ago
@Weiko 🙂
Weiko
Weiko8mo ago
Hi @rush! I definetely need to update that doc indeed. You can create a job like this
export type MyAwesomeJobData = {
workspaceId: string;
something: string;
};

@Injectable()
export class MyAwesomeJob
implements MessageQueueJob<MyAwesomeJobData>
{
constructor(
// You can inject things there like a regular service
) {}

async handle(data: MyAwesomeJobData): Promise<void> {
// Do something here, this will be automatically called when your job gets picked up by the worker
}
}
export type MyAwesomeJobData = {
workspaceId: string;
something: string;
};

@Injectable()
export class MyAwesomeJob
implements MessageQueueJob<MyAwesomeJobData>
{
constructor(
// You can inject things there like a regular service
) {}

async handle(data: MyAwesomeJobData): Promise<void> {
// Do something here, this will be automatically called when your job gets picked up by the worker
}
}
To enqueue the job somewhere in your code, you will need to inject a messageQueueService like this
@Inject(MessageQueue.myAwesomeQueue)
private readonly messageQueueService: MessageQueueService,
@Inject(MessageQueue.myAwesomeQueue)
private readonly messageQueueService: MessageQueueService,
My suggestion would be to add this injection in your GPTAPIService and not your controller. Then you can use the messageQueueService you've just injected like this
this.messageQueueService.add<MyAwesomeJobData>(
MyAwesomeJob.name, // This is the job token/name
{
workspaceId: "1",
something: "something"
}, // This is a MyAwesomeJobData payload
);
this.messageQueueService.add<MyAwesomeJobData>(
MyAwesomeJob.name, // This is the job token/name
{
workspaceId: "1",
something: "something"
}, // This is a MyAwesomeJobData payload
);
This line this.messageQueueService.add will enqueue a message with a MyAwesomeJob.name token to your myAwesomeQueue queue, with the payload MyAwesomeJobData. Since you are using Redis with bull-mq, you should be able to now see your message in bull:myAwesomeQueue:someUniqueIdAutoGenerated key in your redis, this key should contain all the info that will be later used by the worker to actually execute a job by calling the handle method that we had to implement inside the MyAwesomeJob class. Behind the scene, the worker will simply use the token you pushed in your message (MyAwesomeJob.name here) when you called this.messageQueueService.add earlier and match it with an actual service (your MyAwesomeJob job!). It won't be able to pick up the correct job automatically though as it needs be provided in NestJS DI container and it needs to know which service (MyAwesomeJob) is mapped to which token ("MyAwesomeJob.name") even though this is simply the name of the class in our case. To do that, QueueWorkerModule is importing JobsModule and you will need to expose your job there and tell NestJS "When I inject "MyAwesomeJob.name" I want to use MyAwesomeJob class implementation" because that's actually what's the worker expects. See an example here:
// jobs.module.ts
imports: [// modules needed by your job should be imported here]
providers: [
// ...
{
provide: MyAwesomeJob.name,
useClass: MyAwesomeJob,
},
],
})
export class JobsModule {
// jobs.module.ts
imports: [// modules needed by your job should be imported here]
providers: [
// ...
{
provide: MyAwesomeJob.name,
useClass: MyAwesomeJob,
},
],
})
export class JobsModule {
However, as a best practice, I would suggest to not directly provide the job there but actually import another module in JobsModule that you would create which will do that for you. (See TimelineJobModule imported in JobsModule as an example, it's simply another module layer to avoid having a huge JobsModule with imports all over the place and keep things a bit cleaner) Now the last step is to run a worker to actually execute the job, you can do that with npx nx run worker twenty-server. You can also follow the status of your jobs and your queue in redis (feel free to check bull-mq doc for more details as we are simply wrapping it). Hope this helped 👍
rush
rushOP8mo ago
I'll follow the said guidelines and get back to you about it. Thank you for putting in so much effort. So my job is executing properly, and queue is working fine. Thank you for such a detailed guidance on this. @Weiko
Weiko
Weiko8mo ago
Great! 💪
rush
rushOP8mo ago
@Weiko I am returning the resultant processed data but it is not updating in redis cache store and it is showing the returnValue as "null" . Am I missing something? But when I console.log the value it shows the right output
Weiko
Weiko8mo ago
We don't use returnValue in our current codebase so I can't be 100% sure it's supported. In practice your job is added to a queue and processed later by a worker so you can't really know "when" since this is asynchronous. Because of that behavior, the return value will never be processed by the code that enqueued your job Jobs can query the DB or a cache if you want some persistence but won't return anything since you never really know when they are completed, at least with the current codebase. I think bullmq offers some kind of listeners such as onComplete but we don't do that and I wouldn't suggest doing that if it's in the context of a user-facing request as you are supposed to use jobs when you know your logic will take some time and can be dealt with a bit later (and can be retried as well). You can check the code of bullmq.driver.ts if you want to modify some of the behaviors (it's a simple wrapper)
rush
rushOP8mo ago
Thank you for this, I have identified that the design choices that the twenty team has made are better than what I was thinking to do. I may be able to do the same in a different way. I have a question, though- is there anything, on back-end side of things, that will enable me to update the Database with the results of the job when it is completed? I'll work on developing this, if there isn't already. It'll be really helpful for my use-case. is there something that does something like: when job-A completes, send its results as a payload to job-B which updates the DB? more context: I am enriching the existing data with more information (creating metadatafield(s)) obtained from job-A and want to update my DB when the results are in, i.e., (job has run). @Weiko 🙂
charles
charles8mo ago
@rush at the moment, I would write in the cache (redis/bull-mq would be the best here, but postgres/pg-boss could also work). Job B can read from the cache what job A has left for him
rush
rushOP8mo ago
Yes, that is what I have done for now 🙂 thank you
Want results from more Discord servers?
Add your server