Azure Event Hub Configuration
Azure Event Hub is included as a Rider, and supports consuming and producing messages from/to Azure event hubs.
Uses MassTransit.Azure.ServiceBus.Core, MassTransit.EventHub
To consume messages from an event hub, configure a Rider within the bus configuration as shown.
namespace EventHubConsumer;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("connection-string");
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddConsumer<EventHubMessageConsumer>();
rider.UsingEventHub((context, k) =>
{
k.Host("connection-string");
k.Storage("connection-string");
k.ReceiveEndpoint("input-event-hub", c =>
{
c.ConfigureConsumer<EventHubMessageConsumer>(context);
});
});
});
});
}
class EventHubMessageConsumer :
IConsumer<EventHubMessage>
{
public Task Consume(ConsumeContext<EventHubMessage> context)
{
return Task.CompletedTask;
}
}
public record EventHubMessage
{
public string Text { get; init; }
}
}
The familiar ReceiveEndpoint syntax is used to configure an event hub. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the receive endpoint, which should be registered in the rider configuration. While the configuration for event hubs is the same as a receive endpoint, there is no implicit binding of consumer message types to event hubs (there is no pub-sub using event hub).
Configuration
Checkpoint
Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. Checkpointer can be configured on topic bases through next properties:
Name | Description | Default |
---|---|---|
CheckpointInterval | Checkpoint frequency based on time | 1 min |
CheckpointMessageCount | Checkpoint every X messages | 5000 |
MessageLimit | Checkpoint buffer size without blocking consumption | 10000 |
During graceful shutdown Checkpointer will try to "checkpoint" all already consumed messages. Force shutdown should be avoided to prevent multiple consumption for the same message.
Scalability
Riders are designed with performance in mind, handling each topic partition withing separate threadpool. As well, allowing to scale-up consumption within same partition by using PartitionKey, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.
Name | Description | Default |
---|---|---|
ConcurrentDeliveryLimit | Number of Messages delivered concurrently within same partition + PartitionKey. Increasing this value will break ordering, helpful for topics where ordering is not required | 1 |
ConcurrentMessageLimit | Number of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead | 1 |
PrefetchCount | Number of Messages to prefetch from kafka topic into memory | 1000 |
Producers
Producing messages to event hubs uses a producer. In the example below, a messages is produced to the event hub.
namespace EventHubProducer;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("connection-string");
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.UsingEventHub((context, k) =>
{
k.Host("connection-string");
k.Storage("connection-string");
});
});
});
var provider = services.BuildServiceProvider(true);
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(10000).Token);
var serviceScope = provider.CreateScope();
var producerProvider = serviceScope.ServiceProvider.GetRequiredService<IEventHubProducerProvider>();
var producer = await producerProvider.GetProducer("some-event-hub");
await producer.Produce<EventHubMessage>(new { Text = "Hello, Computer." });
}
public record EventHubMessage
{
public string Text { get; init; }
}
}