C
C#3mo ago
thomas

Kafka Flow (Producer Consumer arch.)

I'm having a lil issue with registering consumers on Kafka (using kafka flow).
No description
23 Replies
thomas
thomasOP3mo ago
The logic is extremely simple.
thomas
thomasOP3mo ago
The employee MS create an employee, it saves it to its own DB and then it pushes the ID on kafka.
No description
thomas
thomasOP3mo ago
I want the payroll to "listen" on the queue and for now print at screen the IDs.
canton7
canton73mo ago
I'm not sure what the question is I'm afraid
thomas
thomasOP3mo ago
You're right, its because i forgot to post it. I created this little service
c#
using KafkaFlow;
using Payroll.Shared;

namespace Payroll.API;

public class EmployeeCreatedHandler(IServiceProvider serviceProvider, ILogger<EmployeeCreatedHandler> logger)
: IMessageHandler<EmployeeCreatedEvent>
{
public Task Handle(IMessageContext context, EmployeeCreatedEvent message)
{
logger.LogInformation("EmployeeCreatedHandler: EmployeeId={EmployeeId}", message.EmployeeId);
return Task.CompletedTask;
}
}
c#
using KafkaFlow;
using Payroll.Shared;

namespace Payroll.API;

public class EmployeeCreatedHandler(IServiceProvider serviceProvider, ILogger<EmployeeCreatedHandler> logger)
: IMessageHandler<EmployeeCreatedEvent>
{
public Task Handle(IMessageContext context, EmployeeCreatedEvent message)
{
logger.LogInformation("EmployeeCreatedHandler: EmployeeId={EmployeeId}", message.EmployeeId);
return Task.CompletedTask;
}
}
when i create an employee, the payroll service throws brutally lemme show u
thomas
thomasOP3mo ago
No description
thomas
thomasOP3mo ago
on the left, the employee MS which works fine on the right, the payroll MS which jus hangs fail: Microsoft.Extensions.Hosting.Internal.Host[11] Hosting failed to start System.ArgumentException: An item with the same key has already been added. Key: 6692eb0d-402a-40cd-ba7b-405e7a104e23 at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior)
canton7
canton73mo ago
How do those two errors relate to each other?
thomas
thomasOP3mo ago
the first one happened because i didnt initialise the consumer properly
canton7
canton73mo ago
The MethodMissingException is normally due to something like running against an older version of a DLL to the one you compiled against I feel like you've just dropped a few maybe-related things here, and I'm having a hard time stitching everything together, and working out what you're actually doing, and what's going wrong
thomas
thomasOP3mo ago
can i show u what my producers and consumers look like? yeah my bad im sorry Payroll MS (its inside the program.cs file)
c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string groupName = "payroll-consumer-group";

builder.Services.AddKafkaFlowHostedService(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer =>
consumer
.Topic(topicName)
.WithGroupId(groupName)
.WithBufferSize(100)
.WithWorkersCount(4)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddTypedHandlers(handlers => handlers
.AddHandler<EmployeeCreatedHandler>()
)
)
)
)
);
c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string groupName = "payroll-consumer-group";

builder.Services.AddKafkaFlowHostedService(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer =>
consumer
.Topic(topicName)
.WithGroupId(groupName)
.WithBufferSize(100)
.WithWorkersCount(4)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddTypedHandlers(handlers => handlers
.AddHandler<EmployeeCreatedHandler>()
)
)
)
)
);
EmployeeMS
c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string producerName = "employee-producer";

builder.Services.AddKafka(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m =>
m.AddSerializer<JsonCoreSerializer>()
)
)
)
);
c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string producerName = "employee-producer";

builder.Services.AddKafka(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m =>
m.AddSerializer<JsonCoreSerializer>()
)
)
)
);
First one is how i setup my producer upon create of an employee a message should be pushed to a queue
thomas
thomasOP3mo ago
and that works fine
No description
thomas
thomasOP3mo ago
I'm just not sure about how to get the consumer to work
thomas
thomasOP3mo ago
adding this in the program.cs file (where i have all my aspnet stuff) makes my program crash
No description
thomas
thomasOP3mo ago
with this error right here
thomas
thomasOP3mo ago
lemme send a pic so u can see better
No description
thomas
thomasOP3mo ago
@canton7 Hope this will help, if you want i can also send the repository where all this is at.
thomas
thomasOP3mo ago
ok so the activation isn't needed the issue then is just this
thomas
thomasOP3mo ago
No description
thomas
thomasOP3mo ago
lol the issue is that i should use a deserializer in the consumer n i wrote serializer
thomas
thomasOP3mo ago
Ok last issue i swear
No description
thomas
thomasOP3mo ago
Now it works (as in it doesn't crash) but the service doesn't print anything
Want results from more Discord servers?
Add your server