need help with creating message-queue tasks.
Hello, it is an awesome product and I want to start by thanking you guys for making it opensource. I am trying to add a feature and I've built it raw, without integrating message-queue functionality into it. A little description- my feature uses langchain; it takes the required data and enriches it with more information (through an LLM). Now since I am sendiing many records, and aim to use it for more than 1 users at a time, I want to integrate it with message queues. The documentation doesn't seem to be helping me. I am a bit new to the development space also. I'll be grateful if you point me in the right direction. I have tried to look through and understand how the gmail and calendar sync is happening but doesn't seem to help by developing-with-reference technique for my use-case.
So, this is the controller that is taking the arguments and working to enrich the data:
I want to integrate message queues such that when I send multiple CVs (or through multiple users at once) it adds to the message queue and then executes them one at a time. I am using Bull-MQ and Redis. I can provide more context if you want. I would appreciate any help.
9 Replies
@Weiko 🙂
Hi @rush! I definetely need to update that doc indeed.
You can create a job like this
To enqueue the job somewhere in your code, you will need to inject a messageQueueService like this
My suggestion would be to add this injection in your
GPTAPIService
and not your controller.
Then you can use the messageQueueService you've just injected like this
This line this.messageQueueService.add
will enqueue a message with a MyAwesomeJob.name
token to your myAwesomeQueue
queue, with the payload MyAwesomeJobData
.
Since you are using Redis with bull-mq, you should be able to now see your message in bull:myAwesomeQueue:someUniqueIdAutoGenerated
key in your redis, this key should contain all the info that will be later used by the worker to actually execute a job by calling the handle
method that we had to implement inside the MyAwesomeJob
class.
Behind the scene, the worker will simply use the token you pushed in your message (MyAwesomeJob.name
here) when you called this.messageQueueService.add
earlier and match it with an actual service (your MyAwesomeJob
job!).
It won't be able to pick up the correct job automatically though as it needs be provided in NestJS DI container and it needs to know which service (MyAwesomeJob
) is mapped to which token ("MyAwesomeJob.name"
) even though this is simply the name of the class in our case.
To do that, QueueWorkerModule is importing JobsModule and you will need to expose your job there and tell NestJS "When I inject "MyAwesomeJob.name"
I want to use MyAwesomeJob
class implementation" because that's actually what's the worker expects.
See an example here:
However, as a best practice, I would suggest to not directly provide the job there but actually import another module in JobsModule
that you would create which will do that for you. (See TimelineJobModule
imported in JobsModule
as an example, it's simply another module layer to avoid having a huge JobsModule with imports all over the place and keep things a bit cleaner)
Now the last step is to run a worker to actually execute the job, you can do that with npx nx run worker twenty-server
. You can also follow the status of your jobs and your queue in redis (feel free to check bull-mq doc for more details as we are simply wrapping it).
Hope this helped 👍I'll follow the said guidelines and get back to you about it. Thank you for putting in so much effort.
So my job is executing properly, and queue is working fine. Thank you for such a detailed guidance on this. @Weiko
Great! 💪
@Weiko I am returning the resultant processed data but it is not updating in redis cache store and it is showing the returnValue as "null" . Am I missing something?
But when I console.log the value it shows the right output
We don't use returnValue in our current codebase so I can't be 100% sure it's supported. In practice your job is added to a queue and processed later by a worker so you can't really know "when" since this is asynchronous. Because of that behavior, the return value will never be processed by the code that enqueued your job
Jobs can query the DB or a cache if you want some persistence but won't return anything since you never really know when they are completed, at least with the current codebase. I think bullmq offers some kind of listeners such as
onComplete
but we don't do that and I wouldn't suggest doing that if it's in the context of a user-facing request as you are supposed to use jobs when you know your logic will take some time and can be dealt with a bit later (and can be retried as well).
You can check the code of bullmq.driver.ts if you want to modify some of the behaviors (it's a simple wrapper)Thank you for this, I have identified that the design choices that the twenty team has made are better than what I was thinking to do. I may be able to do the same in a different way.
I have a question, though- is there anything, on back-end side of things, that will enable me to update the Database with the results of the job when it is completed? I'll work on developing this, if there isn't already. It'll be really helpful for my use-case.
is there something that does something like: when job-A completes, send its results as a payload to job-B which updates the DB?
more context:
I am enriching the existing data with more information (creating metadatafield(s)) obtained from
job-A
and want to update my DB when the results are in, i.e., (job has run). @Weiko 🙂@rush at the moment, I would write in the cache (redis/bull-mq would be the best here, but postgres/pg-boss could also work). Job B can read from the cache what job A has left for him
Yes, that is what I have done for now 🙂 thank you