Configuration
MassTransit is usable in most .NET application types. MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 6 or later).
To use MassTransit, add the MassTransit package (from NuGet) and start with the AddMassTransit method shown below.
using MassTransit;
services.AddMassTransit(x =>
{
// A Transport
x.UsingRabbitMq((context, cfg) =>
{
});
});
In this configuration, the following variables are used:
Variable | Type | Description |
---|---|---|
x | IBusRegistrationConfigurator | Configure the bus instance (not transport specific) and the underlying service collection |
context | IBusRegistrationContext | The configured bus context, also implements IServiceProvider |
cfg | IRabbitMqBusFactoryConfigurator | Configure the bus specific to the transport (each transport has its own interface type |
x
) should be called outside of this callback.Adding MassTransit, as shown above, will configure the service collection with required components, including:
- Several interfaces (and their implementations, appropriate for the transport specified)
IBus
(singleton)IBusControl
(singleton)IReceiveEndpointConnector
(singleton)ISendEndpointProvider
(scoped)IPublishEndpoint
(scoped)IRequestClient<T>
(scoped)
- The bus endpoint with the default settings (not started by default)
- The MassTransitHostedService
- Health checks for the bus (or buses) and receive endpoints
- Using
ILoggerFactory
for log output
To configure multiple bus instances in the same service collection, refer to the MultiBus section.
Host Options
MassTransit adds a hosted service so that the generic host can start and stop the bus (or buses, if multiple bus instances are configured). The host options can be configured via MassTransitHostOptions using the Options pattern as shown below.
services.AddOptions<MassTransitHostOptions>()
.Configure(options =>
{
});
Option | Description |
---|---|
WaitUntilStarted | By default, MassTransit connects to the broker asynchronously. When set to true, the MassTransit Hosted Service will block startup until the broker connection has been established. |
StartTimeout | By default, MassTransit waits infinitely until the broker connection is established. If specified, MassTransit will give up after the timeout has expired. |
StopTimeout | MassTransit waits infinitely for the bus to stop, including any active message consumers. If specified, MassTransit will force the bus to stop after the timeout has expired. |
ConsumerStopTimeout | If specified, the ConsumeContext.CancellationToken will be canceled after the specified timeout when the bus is stopping. This allows long-running consumers to observe the cancellation token and react accordingly. Must be <= the StopTimeout |
The .NET Generic Host has its own internal shutdown timeout.
Transport Options
Each supported transport can be configured via a .Host()
method or via the .NET Options Pattern.
- Rabbit MQ: RabbitMqTransportOptions
- Azure Service Bus: AzureServiceBusTransportOptions
- Amazon SQS: AmazonSqsTransportOptions
Consumer Registration
To consume messages, one or more consumers must be added and receive endpoints configured for the added consumers. MassTransit connects each receive endpoint to a queue on the message broker.
To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
MassTransit will automatically configure a receive endpoint for the SubmitOrderConsumer using the name returned by the configured endpoint name formatter. When the bus is started, the receive endpoint will be started and messages will be delivered from the queue by the transport to an instance of the consumer.
All consumer types can be added, including consumers, sagas, saga state machines, and routing slip activities. If a job consumer is added, additional configuration is required.
Learn about the default conventions as well as how to tailor the naming style to meet your requirements in this short video:
To exclude a consumer, saga, or routing slip activity from automatic configuration, use the ExcludeFromConfigureEndpoints extension method when adding the consumer:
x.AddConsumer<SubmitOrderConsumer>()
.ExcludeFromConfigureEndpoints()
Alternatively, the ExcludeFromConfigureEndpoints attribute may be specified on the consumer.
[ExcludeFromConfigureEndpoints]
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
}
Configure Endpoints
As shown in the example above, using ConfigureEndpoints
is the preferred approach to configure receive endpoints. By registering consumers, sagas, and routing slip activities along with their optional definitions, MassTransit is able to configure receive endpoints for all registered consumer types. Receive endpoint names are generated using an endpoint name formatter (unless otherwise specified in a definition), and each receive endpoint is configured.
As receive endpoints are configured, one or more consumer types are configured on each receive endpoint. If multiple consumer types share the same endpoint name, those consumer types will be configured on the same receive endpoint. For each consumer type, its respective consumer, saga, or activity definition will be applied to the receive endpoint.
Configure Endpoints Callback
To apply receive endpoint settings or configure middleware for all receive endpoints configured by ConfigureEndpoints
, a callback can be added.
x.AddConfigureEndpointsCallback((name, cfg) =>
{
cfg.UseMessageRetry(r => r.Immediate(2));
});
When ConfigureEndpoints
is called, any registered callbacks will be called for every recieve endpoint endpoint. Each callback will only be called once per receive endpoint.
To conditionally apply transport-specific settings, the cfg
parameter can be pattern-matched to the transport type as shown below.
x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
rmq.SetQuorumQueue(3);
cfg.UseMessageRetry(r => r.Immediate(2));
});
Endpoint Strategies
Deciding how to configure receive endpoints in your application can be easy or hard, depending upon how much energy you want to spend being concerned with things that usually don't matter. However, there are nuances to the following approaches that should be considered.
One Consumer for Each Queue
Creates a queue for each registered consumer, saga, and routing slip activity. Separate queues are created for execute and compensate if compensation is supported by the activity.
Multiple Consumers on a Single Queue
Configuring multiple consumers, while fully supported by MassTransit, may make sense in certain circumstances, however, proceed with caution as there are limitations to this approach.
The recommendation here is to configure multiple consumers on a single queue only when those consumers are closely related in terms of business function and each consumer consumes distinct message types. An example might be consumers that each create, update, or delete an entity when the dependencies of those operations are different – create and update may depend upon a validation component, while delete may not share that dependency.
Consume Multiple Message Types
In situations where it is preferable to consume multiple message types from a single queue, create a consumer that consumes multiple message types by adding more IConsumer
public class AddressConsumer :
IConsumer<CreateAddress>,
IConsumer<UpdateAddress>
{
}
Sagas follow this approach, creating a single queue for each saga and configuring the broker to route message types consumed by the saga that are published to topic/exchanges to the saga’s queue.
All Consumers on a Single Queue
This is never a good idea and is highly discouraged. While it is supported by MassTransit, it’s unlikely to be operationally sustainable.
Routing slip activities must not be configured on a single queue as they will not work properly.
Endpoint Name Formatters
ConfigureEndpoints uses an IEndpointNameFormatter
to format the queue names for all supported consumer types. The default endpoint name formatter returns PascalCase class names without the namespace. There are several built-in endpoint name formatters included. For the SubmitOrderConsumer, the receive endpoint names would be formatted as shown below. Note that class suffixes such as Consumer, Saga, and Activity are trimmed from the endpoint name by default.
Format | Configuration | Name |
---|---|---|
Default | SetDefaultEndpointNameFormatter | SubmitOrder |
Snake Case | SetSnakeCaseEndpointNameFormatter | submit_order |
Kebab Case | SetKebabCaseEndpointNameFormatter | submit-order |
The endpoint name formatters can also be customized by constructing a new instance and configuring MassTransit to use it.
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter(prefix: "Dev", includeNamespace: false));
By specifying a prefix, the endpoint name would be dev-submit-order
. This is useful when sharing a single broker with multiple developers (Amazon SQS is account-wide, for instance).
When using MultiBus with different endpoint name formatters for each bus...
Receive Endpoints
The previous examples use conventions to configure receive endpoints. Alternatively, receive endpoints can be explicitly configured.
When configuring endpoints manually, ConfigureEndpoints should be excluded or be called after any explicitly configured receive endpoints.
To explicitly configure endpoints, use the ConfigureConsumer or ConfigureConsumers method.
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
});
});
Receive endpoints have transport-independent settings that can be configured.
Name | Description | Default |
---|---|---|
PrefetchCount | Number of unacknowledged messages delivered by the broker | max(CPU Count x 2,16) |
ConcurrentMessageLimit | Number of concurrent messages delivered to consumers | (none, uses PrefetchCount) |
ConfigureConsumeTopology | Create exchanges/topics on the broker and bind them to the receive endpoint | true |
ConfigureMessageTopology | Create exchanges/topics on the broker and bind them to the receive endpoint for a specific message type | true |
PublishFaults | Publish Fault<T> events when consumers fault | true |
DefaultContentType | The default content type for received messages | See serialization |
SerializerContentType | The default content type for sending/publishing messages | See serialization |
The PrefetchCount, ConcurrentMessageLimit, and serialization settings can be specified at the bus level and will be applied to all receive endpoints.
In the following example, the PrefetchCount is set to 32 and the ConcurrentMessageLimit is set to 28.
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.PrefetchCount = 32; // applies to all receive endpoints
cfg.ReceiveEndpoint("order-service", e =>
{
e.ConcurrentMessageLimit = 28; // only applies to this endpoint
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
});
});
When using ConfigureConsumer with a consumer that has a definition, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.
Temporary Endpoints
Some consumers only need to receive messages while connected, and any messages published while disconnected should be discarded. This can be achieved by using a TemporaryEndpointDefinition to configure the receive endpoint.
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingInMemory((context, cfg) =>
{
cfg.ReceiveEndpoint(new TemporaryEndpointDefinition(), e =>
{
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
Consumer Definition
A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.
class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
readonly ILogger<SubmitOrderConsumer> _logger;
public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
_logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}
class SubmitOrderConsumerDefinition :
ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
// override the default endpoint name
EndpointName = "order-service";
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
{
// configure message retry with millisecond intervals
endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
// use the outbox to prevent duplicate events from being published
endpointConfigurator.UseInMemoryOutbox();
}
}
Endpoint Configuration
To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint
method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
.Endpoint(e =>
{
// override the default endpoint name
e.Name = "order-service-extreme";
// specify the endpoint as temporary (may be non-durable, auto-delete, etc.)
e.Temporary = false;
// specify an optional concurrent message limit for the consumer
e.ConcurrentMessageLimit = 8;
// only use if needed, a sensible default is provided, and a reasonable
// value is automatically calculated based upon ConcurrentMessageLimit if
// the transport supports it.
e.PrefetchCount = 16;
// set if each service instance should have its own endpoint for the consumer
// so that messages fan out to each instance.
e.InstanceId = "something-unique";
// If you want to prevent the consumer from creating topics/exchanges
// for consumed message types when started.
e.ConfigureConsumeTopology = false;
});
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});
When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName
if it is specified in the constructor. The order of precedence for endpoint naming is explained below.
- Specifying
EndpointName = "submit-order-extreme"
in the constructor which cannot be overriddenx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { EndpointName = "submit-order-extreme"; }
- Specifying
.Endpoint(x => x.Name = "submit-order-extreme")
in the consumer registration, chained toAddConsumer
x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() .Endpoint(x => x.Name = "submit-order-extreme"); public SubmitOrderConsumerDefinition() { Endpoint(x => x.Name = "not used"); }
- Specifying
Endpoint(x => x.Name = "submit-order-extreme")
in the constructor, which creates an endpoint definitionx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { Endpoint(x => x.Name = "submit-order-extreme"); }
- Unspecified, the endpoint name formatter is used (in this case, the endpoint name is
SubmitOrder
using the default formatter)x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { }
Saga Registration
To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.
services.AddMassTransit(r =>
{
// add a state machine saga, with the in-memory repository
r.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
// add a consumer saga with the in-memory repository
r.AddSaga<OrderSaga>()
.InMemoryRepository();
// add a saga by type, without a repository. The repository should be registered
// in the container elsewhere
r.AddSaga(typeof(OrderSaga));
// add a state machine saga by type, including a saga definition for that saga
r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition))
// add all saga state machines by type
r.AddSagaStateMachines(Assembly.GetExecutingAssembly());
// add all sagas in the specified assembly
r.AddSagas(Assembly.GetExecutingAssembly());
// add sagas from the namespace containing the type
r.AddSagasFromNamespaceContaining<OrderSaga>();
r.AddSagasFromNamespaceContaining(typeof(OrderSaga));
});
To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.
services.AddMassTransit(r =>
{
r.AddSagaStateMachine<OrderStateMachine, OrderState>()
.NHibernateRepository()
.Endpoint(e =>
{
e.Name = "order-state";
e.ConcurrentMessageLimit = 8;
});
});
Supported saga persistence storage engines are documented in the saga documentation section.
services.AddMassTransit(x =>
{
x.AddConsumer<ValueEnteredEventConsumer>();
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
And the consumer:
class ValueEnteredEventConsumer :
IConsumer<ValueEntered>
{
ILogger<ValueEnteredEventConsumer> _logger;
public ValueEnteredEventConsumer(ILogger<ValueEnteredEventConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<ValueEntered> context)
{
_logger.LogInformation("Value: {Value}", context.Message.Value);
}
}
An ASP.NET Core application can also configure receive endpoints. The consumer, along with the receive endpoint, is configured within the AddMassTransit configuration. Separate registration of the consumer is not required (and discouraged), however, any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.
services.AddMassTransit(x =>
{
x.AddConsumer<EventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("event-listener", e =>
{
e.ConfigureConsumer<EventConsumer>(context);
});
});
});
class EventConsumer :
IConsumer<ValueEntered>
{
ILogger<EventConsumer> _logger;
public EventConsumer(ILogger<EventConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<ValueEntered> context)
{
_logger.LogInformation("Value: {Value}", context.Message.Value);
}
}
Health Checks
The AddMassTransit method adds an IHealthCheck
to the service collection that you can use to monitor your health. The health check is added with the tags ready
and masstransit
.
To configure health checks, map the ready and live endpoints in your ASP.NET application.
app.MapHealthChecks("/health/ready", new HealthCheckOptions()
{
Predicate = (check) => check.Tags.Contains("ready"),
});
app.MapHealthChecks("/health/live", new HealthCheckOptions());
Example Output
{
"status": "Healthy",
"totalDuration": "00:00:00.2134026",
"entries": {
"masstransit-bus": {
"data": {
"Endpoints": {
"rabbitmq://localhost/dev-local/SubmitOrder": {
"status": "Healthy",
"description": "ready"
}
}
},
"description": "Ready",
"duration": "00:00:00.1853530",
"status": "Healthy",
"tags": [
"ready",
"masstransit"
]
}
}
}
- When everything works correctly, MassTransit will report
Healthy
. - If any problems occur on application startup, MassTransit will report
Unhealthy
. This can cause an orcestrator to restart your application. - If any problems occur while the application is working (for example, application loses connection to broker), MassTransit will report
Degraded
.
Health Check Options
Health Checks can be further configured using ConfigureHealthCheckOptions:
builder.Services.AddMassTransit(bus =>
{
bus.ConfigureHealthCheckOptions(options =>
{
options.Name = "masstransit";
options.MinimalFailureStatus = HealthStatus.Unhealthy;
options.Tags.Add("health");
});
}
Setting | Description | Default value |
---|---|---|
Name | Set the health check name, overrides the default bus type name. | Bus name. |
MinimalFailureStatus | The minimal HealthStatus that will be reported when the health check fails. | Unhealthy |
Tags | A list of tags that can be used to filter sets of health checks. | "ready", "masstransit" |
By default MassTransit reports all three statuses depending on application state.
If MinimalFailureStatus
is set to Healthy
, MassTransit will log any issues, but the health check will always report Healthy
.
If MinimalFailureStatus
is set to Degraded
, MassTransit will report Degraded
if any issues occur, but never report Unhealthy
.
Tags inside options will override default tags. You will need to add ready
and masstransit
tags manually if you want to keep them.