Azure Event Hub Configuration

Azure Event Hub is included as a Rider, and supports consuming and producing messages from/to Azure event hubs.

Uses MassTransit.Azure.ServiceBus.Core, MassTransit.EventHub

To consume messages from an event hub, configure a Rider within the bus configuration as shown.

namespace EventHubConsumer;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingAzureServiceBus((context, cfg) =>
            {
                cfg.Host("connection-string");

                cfg.ConfigureEndpoints(context);
            });

            x.AddRider(rider =>
            {
                rider.AddConsumer<EventHubMessageConsumer>();

                rider.UsingEventHub((context, k) =>
                {
                    k.Host("connection-string");

                    k.Storage("connection-string");

                    k.ReceiveEndpoint("input-event-hub", c =>
                    {
                        c.ConfigureConsumer<EventHubMessageConsumer>(context);
                    });
                });
            });
        });
    }

    class EventHubMessageConsumer :
        IConsumer<EventHubMessage>
    {
        public Task Consume(ConsumeContext<EventHubMessage> context)
        {
            return Task.CompletedTask;
        }
    }

    public record EventHubMessage
    {
        public string Text { get; init; }
    }
}

The familiar ReceiveEndpoint syntax is used to configure an event hub. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the receive endpoint, which should be registered in the rider configuration. While the configuration for event hubs is the same as a receive endpoint, there is no implicit binding of consumer message types to event hubs (there is no pub-sub using event hub).

Configuration

Checkpoint

Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. Checkpointer can be configured on topic bases through next properties:

NameDescriptionDefault
CheckpointIntervalCheckpoint frequency based on time1 min
CheckpointMessageCountCheckpoint every X messages5000
MessageLimitCheckpoint buffer size without blocking consumption10000
Please note, each topic partition has it's own checkpointer and configuration is applied to partition and not to entire topic.

During graceful shutdown Checkpointer will try to "checkpoint" all already consumed messages. Force shutdown should be avoided to prevent multiple consumption for the same message.

Scalability

Riders are designed with performance in mind, handling each topic partition withing separate threadpool. As well, allowing to scale-up consumption within same partition by using PartitionKey, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.

NameDescriptionDefault
ConcurrentDeliveryLimitNumber of Messages delivered concurrently within same partition + PartitionKey. Increasing this value will break ordering, helpful for topics where ordering is not required1
ConcurrentMessageLimitNumber of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead1
PrefetchCountNumber of Messages to prefetch from kafka topic into memory1000

Producers

Producing messages to event hubs uses a producer. In the example below, a messages is produced to the event hub.

namespace EventHubProducer;

using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingAzureServiceBus((context, cfg) =>
            {
                cfg.Host("connection-string");

                cfg.ConfigureEndpoints(context);
            });

            x.AddRider(rider =>
            {
                rider.UsingEventHub((context, k) =>
                {
                    k.Host("connection-string");

                    k.Storage("connection-string");
                });
            });
        });

        var provider = services.BuildServiceProvider(true);

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(10000).Token);

        var serviceScope = provider.CreateScope();

        var producerProvider = serviceScope.ServiceProvider.GetRequiredService<IEventHubProducerProvider>();
        var producer = await producerProvider.GetProducer("some-event-hub");

        await producer.Produce<EventHubMessage>(new { Text = "Hello, Computer." });
    }

    public record EventHubMessage
    {
        public string Text { get; init; }
    }
}
Table of Contents