MassTransit is usable in most .NET application types. MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 6 or later).

To use MassTransit, add the MassTransit package (from NuGet) and start with the AddMassTransit method shown below.

using MassTransit;services.AddMassTransit(x =>{    // A Transport    x.UsingRabbitMq((context, cfg) =>    {    });});

In this configuration, the following variables are used:

xIBusRegistrationConfiguratorConfigure the bus instance (not transport specific) and the underlying service collection
contextIBusRegistrationContextThe configured bus context, also implements IServiceProvider
cfgIRabbitMqBusFactoryConfiguratorConfigure the bus specific to the transport (each transport has its own interface type
The callback passed to the UsingRabbitMq method is invoked after the service collection has been built. Any methods to configure the bus instance (using x) should be called outside of this callback.

Adding MassTransit, as shown above, will configure the service collection with required components, including:

  • Several interfaces (and their implementations, appropriate for the transport specified)
    • IBus (singleton)
    • IBusControl (singleton)
    • IReceiveEndpointConnector (singleton)
    • ISendEndpointProvider (scoped)
    • IPublishEndpoint (scoped)
    • IRequestClient<T> (scoped)
  • The bus endpoint with the default settings (not started by default)
  • The MassTransitHostedService
  • Health checks for the bus (or buses) and receive endpoints
  • Using ILoggerFactory for log output

To configure multiple bus instances in the same service collection, refer to the MultiBus section.

Host Options

MassTransit adds a hosted service so that the generic host can start and stop the bus (or buses, if multiple bus instances are configured). The host options can be configured via MassTransitHostOptions using the Options pattern as shown below.

services.AddOptions<MassTransitHostOptions>()    .Configure(options =>    {    });
WaitUntilStartedBy default, MassTransit connects to the broker asynchronously. When set to true, the MassTransit Hosted Service will block startup until the broker connection has been established.
StartTimeoutBy default, MassTransit waits infinitely until the broker connection is established. If specified, MassTransit will give up after the timeout has expired.
StopTimeoutMassTransit waits infinitely for the bus to stop, including any active message consumers. If specified, MassTransit will force the bus to stop after the timeout has expired.
ConsumerStopTimeoutIf specified, the ConsumeContext.CancellationToken will be canceled after the specified timeout when the bus is stopping. This allows long-running consumers to observe the cancellation token and react accordingly. Must be <= the StopTimeout

The .NET Generic Host has its own internal shutdown timeout.

Transport Options

Each supported transport can be configured via a .Host() method or via the .NET Options Pattern.

Consumer Registration

To consume messages, one or more consumers must be added and receive endpoints configured for the added consumers. MassTransit connects each receive endpoint to a queue on the message broker.

To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();        x.UsingRabbitMq((context, cfg) =>    {        cfg.ConfigureEndpoints(context);    });});
ConfigureEndpoints should be the last method called after all settings and middleware components have been configured.

MassTransit will automatically configure a receive endpoint for the SubmitOrderConsumer using the name returned by the configured endpoint name formatter. When the bus is started, the receive endpoint will be started and messages will be delivered from the queue by the transport to an instance of the consumer.

All consumer types can be added, including consumers, sagas, saga state machines, and routing slip activities. If a job consumer is added, additional configuration is required.

Learn about the default conventions as well as how to tailor the naming style to meet your requirements in this short video:

To exclude a consumer, saga, or routing slip activity from automatic configuration, use the ExcludeFromConfigureEndpoints extension method when adding the consumer:

x.AddConsumer<SubmitOrderConsumer>()    .ExcludeFromConfigureEndpoints()

Alternatively, the ExcludeFromConfigureEndpoints attribute may be specified on the consumer.

[ExcludeFromConfigureEndpoints]public class SubmitOrderConsumer :    IConsumer<SubmitOrder>{}

Configure Endpoints

As shown in the example above, using ConfigureEndpoints is the preferred approach to configure receive endpoints. By registering consumers, sagas, and routing slip activities along with their optional definitions, MassTransit is able to configure receive endpoints for all registered consumer types. Receive endpoint names are generated using an endpoint name formatter (unless otherwise specified in a definition), and each receive endpoint is configured.

As receive endpoints are configured, one or more consumer types are configured on each receive endpoint. If multiple consumer types share the same endpoint name, those consumer types will be configured on the same receive endpoint. For each consumer type, its respective consumer, saga, or activity definition will be applied to the receive endpoint.

If multiple consumer types share the same receive endpoint, and more than one of those consumer types have a matching definition that specifies the same middleware component, multiple filters may be configured! This may lead to unpredictable results, so caution is advised when configuring multiple consumer types on the same receive endpoint.

Configure Endpoints Callback

To apply receive endpoint settings or configure middleware for all receive endpoints configured by ConfigureEndpoints, a callback can be added.

x.AddConfigureEndpointsCallback((name, cfg) =>{    cfg.UseMessageRetry(r => r.Immediate(2));});

When ConfigureEndpoints is called, any registered callbacks will be called for every recieve endpoint endpoint. Each callback will only be called once per receive endpoint.

To conditionally apply transport-specific settings, the cfg parameter can be pattern-matched to the transport type as shown below.

x.AddConfigureEndpointsCallback((name, cfg) =>{    if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)        rmq.SetQuorumQueue(3);            cfg.UseMessageRetry(r => r.Immediate(2));        });

Endpoint Name Formatters

ConfigureEndpoints uses an IEndpointNameFormatter to format the queue names for all supported consumer types. The default endpoint name formatter returns PascalCase class names without the namespace. There are several built-in endpoint name formatters included. For the SubmitOrderConsumer, the receive endpoint names would be formatted as shown below. Note that class suffixes such as Consumer, Saga, and Activity are trimmed from the endpoint name by default.

Snake CaseSetSnakeCaseEndpointNameFormattersubmit_order
Kebab CaseSetKebabCaseEndpointNameFormattersubmit-order

The endpoint name formatters can also be customized by constructing a new instance and configuring MassTransit to use it.

x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter(prefix: "Dev"));

By specifying a prefix, the endpoint name would be dev-submit-order. This is useful when sharing a single broker with multiple developers (Amazon SQS is account-wide, for instance).

When using MultiBus with different endpoint name formatters for each bus...

Receive Endpoints

The previous examples use conventions to configure receive endpoints. Alternatively, receive endpoints can be explicitly configured.

When configuring endpoints manually, ConfigureEndpoints should be excluded or be called after any explicitly configured receive endpoints.

To explicitly configure endpoints, use the ConfigureConsumer or ConfigureConsumers method.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();        x.UsingRabbitMq((context, cfg) =>    {        cfg.ReceiveEndpoint("order-service", e =>        {            e.ConfigureConsumer<SubmitOrderConsumer>(context);        });    });});

Receive endpoints have transport-independent settings that can be configured.

PrefetchCountNumber of unacknowledged messages delivered by the brokermax(CPU Count x 2,16)
ConcurrentMessageLimitNumber of concurrent messages delivered to consumers(none, uses PrefetchCount)
ConfigureConsumeTopologyCreate exchanges/topics on the broker and bind them to the receive endpointtrue
PublishFaultsPublish Fault<T> events when consumers faulttrue
DefaultContentTypeThe default content type for received messagesSee serialization
SerializerContentTypeThe default content type for sending/publishing messagesSee serialization

The PrefetchCount, ConcurrentMessageLimit, and serialization settings can be specified at the bus level and will be applied to all receive endpoints.

In the following example, the PrefetchCount is set to 32 and the ConcurrentMessageLimit is set to 28.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();        x.UsingRabbitMq((context, cfg) =>    {        cfg.PrefetchCount = 32; // applies to all receive endpoints                cfg.ReceiveEndpoint("order-service", e =>        {            e.ConcurrentMessageLimit = 28; // only applies to this endpoint            e.ConfigureConsumer<SubmitOrderConsumer>(context);        });    });});

When using ConfigureConsumer with a consumer that has a definition, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.

Temporary Endpoints

Some consumers only need to receive messages while connected, and any messages published while disconnected should be discarded. This can be achieved by using a TemporaryEndpointDefinition to configure the receive endpoint.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();    x.UsingInMemory((context, cfg) =>    {        cfg.ReceiveEndpoint(new TemporaryEndpointDefinition(), e =>        {            e.ConfigureConsumer<SubmitOrderConsumer>(context);        });        cfg.ConfigureEndpoints(context);    });});

Consumer Definition

A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.

class SubmitOrderConsumer :    IConsumer<SubmitOrder>{    readonly ILogger<SubmitOrderConsumer> _logger;    public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)    {        _logger = logger;    }    public async Task Consume(ConsumeContext<SubmitOrder> context)    {        _logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);        await context.Publish<OrderSubmitted>(new        {            context.Message.OrderId        });    }}class SubmitOrderConsumerDefinition :    ConsumerDefinition<SubmitOrderConsumer>{    public SubmitOrderConsumerDefinition()    {        // override the default endpoint name        EndpointName = "order-service";        // limit the number of messages consumed concurrently        // this applies to the consumer only, not the endpoint        ConcurrentMessageLimit = 8;    }    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,        IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)    {        // configure message retry with millisecond intervals        endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));        // use the outbox to prevent duplicate events from being published        endpointConfigurator.UseInMemoryOutbox();    }}

Endpoint Configuration

To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition))        .Endpoint(e =>        {            // override the default endpoint name            e.Name = "order-service-extreme";            // specify the endpoint as temporary (may be non-durable, auto-delete, etc.)            e.Temporary = false;            // specify an optional concurrent message limit for the consumer            e.ConcurrentMessageLimit = 8;            // only use if needed, a sensible default is provided, and a reasonable            // value is automatically calculated based upon ConcurrentMessageLimit if            // the transport supports it.            e.PrefetchCount = 16;            // set if each service instance should have its own endpoint for the consumer            // so that messages fan out to each instance.            e.InstanceId = "something-unique";        });    x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));});

When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName if it is specified in the constructor. The order of precedence for endpoint naming is explained below.

  1. Specifying EndpointName = "submit-order-extreme" in the constructor which cannot be overridden
    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){    EndpointName = "submit-order-extreme";}
  2. Specifying .Endpoint(x => x.Name = "submit-order-extreme") in the consumer registration, chained to AddConsumer
    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()    .Endpoint(x => x.Name = "submit-order-extreme");public SubmitOrderConsumerDefinition(){    Endpoint(x => x.Name = "not used");}
  3. Specifying Endpoint(x => x.Name = "submit-order-extreme") in the constructor, which creates an endpoint definition
    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){    Endpoint(x => x.Name = "submit-order-extreme");}
  4. Unspecified, the endpoint name formatter is used (in this case, the endpoint name is SubmitOrder using the default formatter)
    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){}

Saga Registration

To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.

State machine sagas should be added before class-based sagas, and the class-based saga methods should not be used to add state machine sagas. This may be simplified in the future, but for now, be aware of this registration requirement.
services.AddMassTransit(r =>{    // add a state machine saga, with the in-memory repository    r.AddSagaStateMachine<OrderStateMachine, OrderState>()        .InMemoryRepository();    // add a consumer saga with the in-memory repository    r.AddSaga<OrderSaga>()        .InMemoryRepository();    // add a saga by type, without a repository. The repository should be registered    // in the container elsewhere    r.AddSaga(typeof(OrderSaga));    // add a state machine saga by type, including a saga definition for that saga    r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition))    // add all saga state machines by type    r.AddSagaStateMachines(Assembly.GetExecutingAssembly());    // add all sagas in the specified assembly    r.AddSagas(Assembly.GetExecutingAssembly());    // add sagas from the namespace containing the type    r.AddSagasFromNamespaceContaining<OrderSaga>();    r.AddSagasFromNamespaceContaining(typeof(OrderSaga));});

To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.

services.AddMassTransit(r =>{    r.AddSagaStateMachine<OrderStateMachine, OrderState>()        .NHibernateRepository()        .Endpoint(e =>        {            e.Name = "order-state";            e.ConcurrentMessageLimit = 8;        });});

Supported saga persistence storage engines are documented in the saga documentation section.

services.AddMassTransit(x =>{    x.AddConsumer<ValueEnteredEventConsumer>();    x.SetKebabCaseEndpointNameFormatter();    x.UsingRabbitMq((context, cfg) =>    {        cfg.ConfigureEndpoints(context);    });});

And the consumer:

class ValueEnteredEventConsumer :    IConsumer<ValueEntered>{    ILogger<ValueEnteredEventConsumer> _logger;    public ValueEnteredEventConsumer(ILogger<ValueEnteredEventConsumer> logger)    {        _logger = logger;    }    public async Task Consume(ConsumeContext<ValueEntered> context)    {        _logger.LogInformation("Value: {Value}", context.Message.Value);    }}

An ASP.NET Core application can also configure receive endpoints. The consumer, along with the receive endpoint, is configured within the AddMassTransit configuration. Separate registration of the consumer is not required (and discouraged), however, any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.

services.AddMassTransit(x =>{    x.AddConsumer<EventConsumer>();    x.UsingRabbitMq((context, cfg) =>    {        cfg.ReceiveEndpoint("event-listener", e =>        {            e.ConfigureConsumer<EventConsumer>(context);        });    });});
class EventConsumer :    IConsumer<ValueEntered>{    ILogger<EventConsumer> _logger;    public EventConsumer(ILogger<EventConsumer> logger)    {        _logger = logger;    }    public async Task Consume(ConsumeContext<ValueEntered> context)    {        _logger.LogInformation("Value: {Value}", context.Message.Value);    }}

Health Checks

The AddMassTransit method adds an IHealthCheck to the service collection that you can use to monitor your health. The health check is added with the tags ready and masstransit.

To configure health checks, map the ready and live endpoints in your ASP.NET application.

app.UseEndpoints(endpoints =>{    endpoints.MapHealthChecks("/health/ready", new HealthCheckOptions()    {        Predicate = (check) => check.Tags.Contains("ready"),    });    endpoints.MapHealthChecks("/health/live", new HealthCheckOptions());});

Example Output

{  "status": "Healthy",  "totalDuration": "00:00:00.2134026",  "entries": {    "masstransit-bus": {      "data": {        "Endpoints": {          "rabbitmq://localhost/dev-local/SubmitOrder": {            "status": "Healthy",            "description": "ready"          }        }      },      "description": "Ready",      "duration": "00:00:00.1853530",      "status": "Healthy",      "tags": [        "ready",        "masstransit"      ]    }  }}