Kafka Configuration
Kafka is supported as a Rider, and supports consuming and producing messages from/to Kafka topics. The Confluent .NET client is used, and has been tested with the community edition (running in Docker).
This sample shows how to use MassTransit with Kafka, including Confluent Cloud.
Topic Endpoints
Uses MassTransit.Kafka
To consume a Kafka topic, configure a Rider within the bus configuration as shown.
namespace KafkaConsumer;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
{
e.ConfigureConsumer<KafkaMessageConsumer>(context);
});
});
});
});
}
class KafkaMessageConsumer :
IConsumer<KafkaMessage>
{
public Task Consume(ConsumeContext<KafkaMessage> context)
{
return Task.CompletedTask;
}
}
public record KafkaMessage
{
public string Text { get; init; }
}
}
A TopicEndpoint connects a Kafka Consumer to a topic, using the specified topic name. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the topic endpoint, which should be registered in the rider configuration. While the configuration for topic endpoints is the same as a receive endpoint, there is no implicit binding of consumer message types to Kafka topics. The message type is specified on the TopicEndpoint as a generic argument.
Wildcard support
Kafka allows to subscribe to multiple topics by using Regex (also called wildcard) which matches multiple topics:
namespace KafkaWildcardConsumer;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<KafkaMessage>("^topic-[0-9]*", "consumer-group-name", e =>
{
e.ConfigureConsumer<KafkaMessageConsumer>(context);
});
});
});
});
}
class KafkaMessageConsumer :
IConsumer<KafkaMessage>
{
public Task Consume(ConsumeContext<KafkaMessage> context)
{
return Task.CompletedTask;
}
}
public record KafkaMessage
{
public string Text { get; init; }
}
}
Configuration
The configuration includes through Confluent client configs or using configurator to override it with style.
Checkpoint
Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. Checkpointer can be configured on topic bases through next properties:
Name | Description | Default |
---|---|---|
CheckpointInterval | Checkpoint frequency based on time | 1 min |
CheckpointMessageCount | Checkpoint every X messages | 5000 |
MessageLimit | Checkpoint buffer size without blocking consumption | 10000 |
During graceful shutdown Checkpointer will try to "checkpoint" all already consumed messages. Force shutdown should be avoided to prevent multiple consumption for the same message.
Scalability
Riders are designed with performance in mind, handling each topic partition withing separate threadpool. As well, allowing to scale-up consumption within same partition by using Key, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.
Name | Description | Default |
---|---|---|
ConcurrentConsumerLimit | Number of Confluent Consumer instances withing same endpoint | 1 |
ConcurrentDeliveryLimit | Number of Messages delivered concurrently within same partition + key. Increasing this value will break ordering, helpful for topics where ordering is not required | 1 |
ConcurrentMessageLimit | Number of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead | 1 |
PrefetchCount | Number of Messages to prefetch from kafka topic into memory | 1000 |
ConcurrentConsumerLimit
is very powerful setting as Confluent consumer is reading one partition at a time, this will allow creating multiple consumers to read from separate partitions. But having higher number of Consumers than Number of Total Partitions would result of having idle consumersConfigure Topology
Producers
Producing messages to Kafka topics requires the producer to be registered. The producer can then be used to produce messages to the specified Kafka topic. In the example below, messages are produced to the Kafka topic as they are entered by the user.
namespace KafkaProducer;
using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddProducer<KafkaMessage>("topic-name");
rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
});
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
try
{
var producer = provider.GetRequiredService<ITopicProducer<KafkaMessage>>();
do
{
string value = await Task.Run(() =>
{
Console.WriteLine("Enter text (or quit to exit)");
Console.Write("> ");
return Console.ReadLine();
});
if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
break;
await producer.Produce(new
{
Text = value
});
} while (true);
}
finally
{
await busControl.StopAsync();
}
}
public record KafkaMessage
{
public string Text { get; init; }
}
}
Producer Provider
With MassTransit v8.1, you can dynamically resolve a producer using ITopicProducerProvider
(registered as Scoped
).
namespace KafkaProducer;
using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
});
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
try
{
var producerProvider = provider.GetRequiredService<ITopicProducerProvider>();
do
{
string value = await Task.Run(() =>
{
Console.WriteLine("Enter topic name (or quit to exit)");
Console.Write("> ");
return Console.ReadLine();
});
if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
break;
var producer = producerProvider.GetProducer<KafkaMessage>(new Uri($"topic:{value}"));
await producer.Produce(new
{
TopicName = value
});
} while (true);
}
finally
{
await busControl.StopAsync();
}
}
public record KafkaMessage
{
public string TopicName { get; init; }
}
}
Tombstone message
A record with the same key from the record we want to delete is produced to the same topic and partition with a null payload. These records are called tombstones. This could be done by setting custom value serializer during produce:
namespace KafkaTombstoneProducer;
using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddProducer<string, KafkaMessage>("topic-name");
rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
});
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
try
{
var producer = provider.GetRequiredService<ITopicProducer<string, KafkaMessage>>();
do
{
string value = await Task.Run(() =>
{
Console.WriteLine("Enter text (or quit to exit)");
Console.Write("> ");
return Console.ReadLine();
});
if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
break;
await producer.Produce("key", new { }, Pipe.Execute<KafkaSendContext<string, KafkaMessage>>(context =>
{
context.ValueSerializer = new TombstoneSerializer<KafkaMessage>();
}));
} while (true);
}
finally
{
await busControl.StopAsync();
}
}
class TombstoneSerializer<T> :
IAsyncSerializer<T>
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.FromResult(Array.Empty<byte>());
}
}
public record KafkaMessage
{
}
}
Producing and Consuming Multiple Message Types on a Single Topic
There are situations where you might want to produce / consume events of different types on the same Kafka topic. A common use case is to use a single topic to log ordered meaningful state change events like SomethingRequested
, SomethingStarted
, SomethingFinished
.
Confluent have some documentation about how this can be implemented on the Schema Registry side:
- Confluent Docs - Multiple Event Types in the Same Topic
- Confluent Docs - Multiple Event Types in the Same Topic with Avro
- Confluent Blog - Multiple Event Types in the Same Topic
Unfortunately, it is not yet widely supported in client tools and products and there is limited documentation about how to support this in your own applications.
However, it is possible... The following demo uses the MassTransit Kafka Rider with custom Avro serializer / deserializer implementations and the Schema Registry to support multiple event types on a single topic:
The custom serializers / deserializer implementations leverage the wire format used by the standard Confluent schema-based serializers, which includes the schema id in the data stored for each message. This is also good news for interoperability with non-MassTransit applications.
Warning: It's a little hacky and only supports the Avro format, but there's enough there to get you started.