Configuration
MassTransit is usable in most .NET application types. MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 6 or later).
To use MassTransit, add the MassTransit package (from NuGet) and start with the AddMassTransit method shown below.
using MassTransit;services.AddMassTransit(x =>{ // A Transport x.UsingRabbitMq((context, cfg) => { });});
In this configuration, the following variables are used:
Variable | Type | Description |
---|---|---|
x | IBusRegistrationConfigurator | Configure the bus instance (not transport specific) and the underlying service collection |
context | IBusRegistrationContext | The configured bus context, also implements IServiceProvider |
cfg | IRabbitMqBusFactoryConfigurator | Configure the bus specific to the transport (each transport has its own interface type |
x
) should be called outside of this callback.Adding MassTransit, as shown above, will configure the service collection with required components, including:
- Several interfaces (and their implementations, appropriate for the transport specified)
IBus
(singleton)IBusControl
(singleton)IReceiveEndpointConnector
(singleton)ISendEndpointProvider
(scoped)IPublishEndpoint
(scoped)IRequestClient<T>
(scoped)
- The bus endpoint with the default settings (not started by default)
- The MassTransitHostedService
- Health checks for the bus (or buses) and receive endpoints
- Using
ILoggerFactory
for log output
To configure multiple bus instances in the same service collection, refer to the MultiBus section.
Host Options
MassTransit adds a hosted service so that the generic host can start and stop the bus (or buses, if multiple bus instances are configured). The host options can be configured via MassTransitHostOptions using the Options pattern as shown below.
services.AddOptions<MassTransitHostOptions>() .Configure(options => { });
Option | Description |
---|---|
WaitUntilStarted | By default, MassTransit connects to the broker asynchronously. When set to true, the MassTransit Hosted Service will block startup until the broker connection has been established. |
StartTimeout | By default, MassTransit waits infinitely until the broker connection is established. If specified, MassTransit will give up after the timeout has expired. |
StopTimeout | MassTransit waits infinitely for the bus to stop, including any active message consumers. If specified, MassTransit will force the bus to stop after the timeout has expired. |
ConsumerStopTimeout | If specified, the ConsumeContext.CancellationToken will be canceled after the specified timeout when the bus is stopping. This allows long-running consumers to observe the cancellation token and react accordingly. Must be <= the StopTimeout |
The .NET Generic Host has its own internal shutdown timeout.
Transport Options
Each supported transport can be configured via a .Host()
method or via the .NET Options Pattern.
- Rabbit MQ: RabbitMqTransportOptions
- Azure Service Bus: AzureServiceBusTransportOptions
- Amazon SQS: AmazonSqsTransportOptions
Consumer Registration
To consume messages, one or more consumers must be added and receive endpoints configured for the added consumers. MassTransit connects each receive endpoint to a queue on the message broker.
To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); });});
MassTransit will automatically configure a receive endpoint for the SubmitOrderConsumer using the name returned by the configured endpoint name formatter. When the bus is started, the receive endpoint will be started and messages will be delivered from the queue by the transport to an instance of the consumer.
All consumer types can be added, including consumers, sagas, saga state machines, and routing slip activities. If a job consumer is added, additional configuration is required.
Learn about the default conventions as well as how to tailor the naming style to meet your requirements in this short video:
To exclude a consumer, saga, or routing slip activity from automatic configuration, use the ExcludeFromConfigureEndpoints extension method when adding the consumer:
x.AddConsumer<SubmitOrderConsumer>() .ExcludeFromConfigureEndpoints()
Alternatively, the ExcludeFromConfigureEndpoints attribute may be specified on the consumer.
[ExcludeFromConfigureEndpoints]public class SubmitOrderConsumer : IConsumer<SubmitOrder>{}
Configure Endpoints
As shown in the example above, using ConfigureEndpoints
is the preferred approach to configure receive endpoints. By registering consumers, sagas, and routing slip activities along with their optional definitions, MassTransit is able to configure receive endpoints for all registered consumer types. Receive endpoint names are generated using an endpoint name formatter (unless otherwise specified in a definition), and each receive endpoint is configured.
As receive endpoints are configured, one or more consumer types are configured on each receive endpoint. If multiple consumer types share the same endpoint name, those consumer types will be configured on the same receive endpoint. For each consumer type, its respective consumer, saga, or activity definition will be applied to the receive endpoint.
Configure Endpoints Callback
To apply receive endpoint settings or configure middleware for all receive endpoints configured by ConfigureEndpoints
, a callback can be added.
x.AddConfigureEndpointsCallback((name, cfg) =>{ cfg.UseMessageRetry(r => r.Immediate(2));});
When ConfigureEndpoints
is called, any registered callbacks will be called for every recieve endpoint endpoint. Each callback will only be called once per receive endpoint.
To conditionally apply transport-specific settings, the cfg
parameter can be pattern-matched to the transport type as shown below.
x.AddConfigureEndpointsCallback((name, cfg) =>{ if (cfg is IRabbitMqReceiveEndpointConfigurator rmq) rmq.SetQuorumQueue(3); cfg.UseMessageRetry(r => r.Immediate(2)); });
Endpoint Name Formatters
ConfigureEndpoints uses an IEndpointNameFormatter
to format the queue names for all supported consumer types. The default endpoint name formatter returns PascalCase class names without the namespace. There are several built-in endpoint name formatters included. For the SubmitOrderConsumer, the receive endpoint names would be formatted as shown below. Note that class suffixes such as Consumer, Saga, and Activity are trimmed from the endpoint name by default.
Format | Configuration | Name |
---|---|---|
Default | SetDefaultEndpointNameFormatter | SubmitOrder |
Snake Case | SetSnakeCaseEndpointNameFormatter | submit_order |
Kebab Case | SetKebabCaseEndpointNameFormatter | submit-order |
The endpoint name formatters can also be customized by constructing a new instance and configuring MassTransit to use it.
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter(prefix: "Dev"));
By specifying a prefix, the endpoint name would be dev-submit-order
. This is useful when sharing a single broker with multiple developers (Amazon SQS is account-wide, for instance).
When using MultiBus with different endpoint name formatters for each bus...
Receive Endpoints
The previous examples use conventions to configure receive endpoints. Alternatively, receive endpoints can be explicitly configured.
When configuring endpoints manually, ConfigureEndpoints should be excluded or be called after any explicitly configured receive endpoints.
To explicitly configure endpoints, use the ConfigureConsumer or ConfigureConsumers method.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("order-service", e => { e.ConfigureConsumer<SubmitOrderConsumer>(context); }); });});
Receive endpoints have transport-independent settings that can be configured.
Name | Description | Default |
---|---|---|
PrefetchCount | Number of unacknowledged messages delivered by the broker | max(CPU Count x 2,16) |
ConcurrentMessageLimit | Number of concurrent messages delivered to consumers | (none, uses PrefetchCount) |
ConfigureConsumeTopology | Create exchanges/topics on the broker and bind them to the receive endpoint | true |
PublishFaults | Publish Fault<T> events when consumers fault | true |
DefaultContentType | The default content type for received messages | See serialization |
SerializerContentType | The default content type for sending/publishing messages | See serialization |
The PrefetchCount, ConcurrentMessageLimit, and serialization settings can be specified at the bus level and will be applied to all receive endpoints.
In the following example, the PrefetchCount is set to 32 and the ConcurrentMessageLimit is set to 28.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.PrefetchCount = 32; // applies to all receive endpoints cfg.ReceiveEndpoint("order-service", e => { e.ConcurrentMessageLimit = 28; // only applies to this endpoint e.ConfigureConsumer<SubmitOrderConsumer>(context); }); });});
When using ConfigureConsumer with a consumer that has a definition, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.
Temporary Endpoints
Some consumers only need to receive messages while connected, and any messages published while disconnected should be discarded. This can be achieved by using a TemporaryEndpointDefinition to configure the receive endpoint.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(); x.UsingInMemory((context, cfg) => { cfg.ReceiveEndpoint(new TemporaryEndpointDefinition(), e => { e.ConfigureConsumer<SubmitOrderConsumer>(context); }); cfg.ConfigureEndpoints(context); });});
Consumer Definition
A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.
class SubmitOrderConsumer : IConsumer<SubmitOrder>{ readonly ILogger<SubmitOrderConsumer> _logger; public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<SubmitOrder> context) { _logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId); await context.Publish<OrderSubmitted>(new { context.Message.OrderId }); }}class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>{ public SubmitOrderConsumerDefinition() { // override the default endpoint name EndpointName = "order-service"; // limit the number of messages consumed concurrently // this applies to the consumer only, not the endpoint ConcurrentMessageLimit = 8; } protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator) { // configure message retry with millisecond intervals endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000)); // use the outbox to prevent duplicate events from being published endpointConfigurator.UseInMemoryOutbox(); }}
Endpoint Configuration
To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint
method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition)) .Endpoint(e => { // override the default endpoint name e.Name = "order-service-extreme"; // specify the endpoint as temporary (may be non-durable, auto-delete, etc.) e.Temporary = false; // specify an optional concurrent message limit for the consumer e.ConcurrentMessageLimit = 8; // only use if needed, a sensible default is provided, and a reasonable // value is automatically calculated based upon ConcurrentMessageLimit if // the transport supports it. e.PrefetchCount = 16; // set if each service instance should have its own endpoint for the consumer // so that messages fan out to each instance. e.InstanceId = "something-unique"; }); x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));});
When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName
if it is specified in the constructor. The order of precedence for endpoint naming is explained below.
- Specifying
EndpointName = "submit-order-extreme"
in the constructor which cannot be overriddenx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){ EndpointName = "submit-order-extreme";}
- Specifying
.Endpoint(x => x.Name = "submit-order-extreme")
in the consumer registration, chained toAddConsumer
x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() .Endpoint(x => x.Name = "submit-order-extreme");public SubmitOrderConsumerDefinition(){ Endpoint(x => x.Name = "not used");}
- Specifying
Endpoint(x => x.Name = "submit-order-extreme")
in the constructor, which creates an endpoint definitionx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){ Endpoint(x => x.Name = "submit-order-extreme");}
- Unspecified, the endpoint name formatter is used (in this case, the endpoint name is
SubmitOrder
using the default formatter)x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){}
Saga Registration
To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.
services.AddMassTransit(r =>{ // add a state machine saga, with the in-memory repository r.AddSagaStateMachine<OrderStateMachine, OrderState>() .InMemoryRepository(); // add a consumer saga with the in-memory repository r.AddSaga<OrderSaga>() .InMemoryRepository(); // add a saga by type, without a repository. The repository should be registered // in the container elsewhere r.AddSaga(typeof(OrderSaga)); // add a state machine saga by type, including a saga definition for that saga r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition)) // add all saga state machines by type r.AddSagaStateMachines(Assembly.GetExecutingAssembly()); // add all sagas in the specified assembly r.AddSagas(Assembly.GetExecutingAssembly()); // add sagas from the namespace containing the type r.AddSagasFromNamespaceContaining<OrderSaga>(); r.AddSagasFromNamespaceContaining(typeof(OrderSaga));});
To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.
services.AddMassTransit(r =>{ r.AddSagaStateMachine<OrderStateMachine, OrderState>() .NHibernateRepository() .Endpoint(e => { e.Name = "order-state"; e.ConcurrentMessageLimit = 8; });});
Supported saga persistence storage engines are documented in the saga documentation section.
services.AddMassTransit(x =>{ x.AddConsumer<ValueEnteredEventConsumer>(); x.SetKebabCaseEndpointNameFormatter(); x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); });});
And the consumer:
class ValueEnteredEventConsumer : IConsumer<ValueEntered>{ ILogger<ValueEnteredEventConsumer> _logger; public ValueEnteredEventConsumer(ILogger<ValueEnteredEventConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<ValueEntered> context) { _logger.LogInformation("Value: {Value}", context.Message.Value); }}
An ASP.NET Core application can also configure receive endpoints. The consumer, along with the receive endpoint, is configured within the AddMassTransit configuration. Separate registration of the consumer is not required (and discouraged), however, any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.
services.AddMassTransit(x =>{ x.AddConsumer<EventConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("event-listener", e => { e.ConfigureConsumer<EventConsumer>(context); }); });});
class EventConsumer : IConsumer<ValueEntered>{ ILogger<EventConsumer> _logger; public EventConsumer(ILogger<EventConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<ValueEntered> context) { _logger.LogInformation("Value: {Value}", context.Message.Value); }}
Health Checks
The AddMassTransit method adds an IHealthCheck
to the service collection that you can use to monitor your health. The health check is added with the tags ready
and masstransit
.
To configure health checks, map the ready and live endpoints in your ASP.NET application.
app.UseEndpoints(endpoints =>{ endpoints.MapHealthChecks("/health/ready", new HealthCheckOptions() { Predicate = (check) => check.Tags.Contains("ready"), }); endpoints.MapHealthChecks("/health/live", new HealthCheckOptions());});
Example Output
{ "status": "Healthy", "totalDuration": "00:00:00.2134026", "entries": { "masstransit-bus": { "data": { "Endpoints": { "rabbitmq://localhost/dev-local/SubmitOrder": { "status": "Healthy", "description": "ready" } } }, "description": "Ready", "duration": "00:00:00.1853530", "status": "Healthy", "tags": [ "ready", "masstransit" ] } }}