C
C#3w ago
юрий

MassTransit + EventHub with own message format

I am trying to consume EventHub messages that are in my own format. They are not wrapped in a MassTransit envelope, and I cannot change the format, because it is an existing EventHub. Now the problem is that in theory, according to my beginner knowledge of MassTransit, this should consume my messages, however, it does not. I set some breakpoints in MassTransit and in the screenshot it appears to be where it gets stuck, and gets 'dead lettered' by MassTransit. What I am trying to do should be possible, right? My payload
{
"Type": "ExampleMessage"
}
{
"Type": "ExampleMessage"
}
My very basic configuration with EventHub emulator and Azurite
var builder = Host.CreateApplicationBuilder();
builder.Services
.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddConsumer<EventHubMessageConsumer>();

rider.UsingEventHub((context, k) =>
{
k.Host(
"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;");
k.Storage("UseDevelopmentStorage=true");

k.ReceiveEndpoint("eh1", c =>
{
c.UseRawJsonSerializer(RawSerializerOptions.AnyMessageType, true);
c.ConfigureConsumer<EventHubMessageConsumer>(context);
});
});
});
});

var host = builder.Build();
await host.RunAsync();

public class EventHubMessageConsumer :
IConsumer<EventHubMessage>
{
public Task Consume(ConsumeContext<EventHubMessage> context)
{
return Task.CompletedTask;
}
}

public record EventHubMessage
{
public string Type { get; init; }
}
var builder = Host.CreateApplicationBuilder();
builder.Services
.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddConsumer<EventHubMessageConsumer>();

rider.UsingEventHub((context, k) =>
{
k.Host(
"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;");
k.Storage("UseDevelopmentStorage=true");

k.ReceiveEndpoint("eh1", c =>
{
c.UseRawJsonSerializer(RawSerializerOptions.AnyMessageType, true);
c.ConfigureConsumer<EventHubMessageConsumer>(context);
});
});
});
});

var host = builder.Build();
await host.RunAsync();

public class EventHubMessageConsumer :
IConsumer<EventHubMessage>
{
public Task Consume(ConsumeContext<EventHubMessage> context)
{
return Task.CompletedTask;
}
}

public record EventHubMessage
{
public string Type { get; init; }
}
No description
0 Replies
No replies yetBe the first to reply to this messageJoin

Did you find this page helpful?