Kafka Configuration

Kafka is supported as a Rider, and supports consuming and producing messages from/to Kafka topics. The Confluent .NET client is used, and has been tested with the community edition (running in Docker).

MassTransit/Sample-Kafka

This sample shows how to use MassTransit with Kafka, including Confluent Cloud.

Topic Endpoints

Uses MassTransit.Kafka

To consume a Kafka topic, configure a Rider within the bus configuration as shown.

namespace KafkaConsumer;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingInMemory();

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer>();

                rider.UsingKafka((context, k) =>
                {
                    k.Host("localhost:9092");

                    k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer>(context);
                    });
                });
            });
        });
    }

    class KafkaMessageConsumer :
        IConsumer<KafkaMessage>
    {
        public Task Consume(ConsumeContext<KafkaMessage> context)
        {
            return Task.CompletedTask;
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

A TopicEndpoint connects a Kafka Consumer to a topic, using the specified topic name. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the topic endpoint, which should be registered in the rider configuration. While the configuration for topic endpoints is the same as a receive endpoint, there is no implicit binding of consumer message types to Kafka topics. The message type is specified on the TopicEndpoint as a generic argument.

Wildcard support

Kafka allows to subscribe to multiple topics by using Regex (also called wildcard) which matches multiple topics:

namespace KafkaWildcardConsumer;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingInMemory();

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer>();

                rider.UsingKafka((context, k) =>
                {
                    k.Host("localhost:9092");

                    k.TopicEndpoint<KafkaMessage>("^topic-[0-9]*", "consumer-group-name", e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer>(context);
                    });
                });
            });
        });
    }

    class KafkaMessageConsumer :
        IConsumer<KafkaMessage>
    {
        public Task Consume(ConsumeContext<KafkaMessage> context)
        {
            return Task.CompletedTask;
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

Configuration

The configuration includes through Confluent client configs or using configurator to override it with style.

Checkpoint

Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. Checkpointer can be configured on topic bases through next properties:

NameDescriptionDefault
CheckpointIntervalCheckpoint frequency based on time1 min
CheckpointMessageCountCheckpoint every X messages5000
MessageLimitCheckpoint buffer size without blocking consumption10000
Please note, each topic partition has it's own checkpointer and configuration is applied to partition and not to entire topic.

During graceful shutdown Checkpointer will try to "checkpoint" all already consumed messages. Force shutdown should be avoided to prevent multiple consumption for the same message.

Scalability

Riders are designed with performance in mind, handling each topic partition withing separate threadpool. As well, allowing to scale-up consumption within same partition by using Key, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.

NameDescriptionDefault
ConcurrentConsumerLimitNumber of Confluent Consumer instances withing same endpoint1
ConcurrentDeliveryLimitNumber of Messages delivered concurrently within same partition + key. Increasing this value will break ordering, helpful for topics where ordering is not required1
ConcurrentMessageLimitNumber of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead1
PrefetchCountNumber of Messages to prefetch from kafka topic into memory1000
ConcurrentConsumerLimit is very powerful setting as Confluent consumer is reading one partition at a time, this will allow creating multiple consumers to read from separate partitions. But having higher number of Consumers than Number of Total Partitions would result of having idle consumers

Configure Topology

Kafka is not intended to create topology during startup. Topics should be created with correct number of partitions and replicas beforehand

Producers

Producing messages to Kafka topics requires the producer to be registered. The producer can then be used to produce messages to the specified Kafka topic. In the example below, messages are produced to the Kafka topic as they are entered by the user.

namespace KafkaProducer;

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingInMemory();

            x.AddRider(rider =>
            {
                rider.AddProducer<KafkaMessage>("topic-name");

                rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
            });
        });

        var provider = services.BuildServiceProvider();

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
        try
        {
            var producer = provider.GetRequiredService<ITopicProducer<KafkaMessage>>();
            do
            {
                string value = await Task.Run(() =>
                {
                    Console.WriteLine("Enter text (or quit to exit)");
                    Console.Write("> ");
                    return Console.ReadLine();
                });

                if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                    break;

                await producer.Produce(new
                {
                    Text = value
                });
            } while (true);
        }
        finally
        {
            await busControl.StopAsync();
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

Producer Provider

With MassTransit v8.1, you can dynamically resolve a producer using ITopicProducerProvider (registered as Scoped).

namespace KafkaProducer;

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingInMemory();

            x.AddRider(rider =>
            {
                rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
            });
        });

        var provider = services.BuildServiceProvider();

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
        try
        {
            var producerProvider = provider.GetRequiredService<ITopicProducerProvider>();
            
            do
            {
                string value = await Task.Run(() =>
                {
                    Console.WriteLine("Enter topic name (or quit to exit)");
                    Console.Write("> ");
                    return Console.ReadLine();
                });

                if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                    break;
                
                var producer = producerProvider.GetProducer<KafkaMessage>(new Uri($"topic:{value}"));

                await producer.Produce(new
                {
                    TopicName = value
                });
            } while (true);
        }
        finally
        {
            await busControl.StopAsync();
        }
    }

    public record KafkaMessage
    {
        public string TopicName { get; init; }
    }
}

Tombstone message

A record with the same key from the record we want to delete is produced to the same topic and partition with a null payload. These records are called tombstones. This could be done by setting custom value serializer during produce:

namespace KafkaTombstoneProducer;

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingInMemory();

            x.AddRider(rider =>
            {
                rider.AddProducer<string, KafkaMessage>("topic-name");

                rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
            });
        });

        var provider = services.BuildServiceProvider();

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
        try
        {
            var producer = provider.GetRequiredService<ITopicProducer<string, KafkaMessage>>();
            do
            {
                string value = await Task.Run(() =>
                {
                    Console.WriteLine("Enter text (or quit to exit)");
                    Console.Write("> ");
                    return Console.ReadLine();
                });

                if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                    break;

                await producer.Produce("key", new { }, Pipe.Execute<KafkaSendContext<string, KafkaMessage>>(context =>
                {
                    context.ValueSerializer = new TombstoneSerializer<KafkaMessage>();
                }));
            } while (true);
        }
        finally
        {
            await busControl.StopAsync();
        }
    }

    class TombstoneSerializer<T> :
        IAsyncSerializer<T>
    {
        public Task<byte[]> SerializeAsync(T data, SerializationContext context)
        {
            return Task.FromResult(Array.Empty<byte>());
        }
    }

    public record KafkaMessage
    {
    }
}

Producing and Consuming Multiple Message Types on a Single Topic

There are situations where you might want to produce / consume events of different types on the same Kafka topic. A common use case is to use a single topic to log ordered meaningful state change events like SomethingRequested, SomethingStarted, SomethingFinished.

Confluent have some documentation about how this can be implemented on the Schema Registry side:

Unfortunately, it is not yet widely supported in client tools and products and there is limited documentation about how to support this in your own applications.

However, it is possible... The following demo uses the MassTransit Kafka Rider with custom Avro serializer / deserializer implementations and the Schema Registry to support multiple event types on a single topic:

MassTransit-Kafka-Demo

The custom serializers / deserializer implementations leverage the wire format used by the standard Confluent schema-based serializers, which includes the schema id in the data stored for each message. This is also good news for interoperability with non-MassTransit applications.

Warning: It's a little hacky and only supports the Avro format, but there's enough there to get you started.