MultiBus

pronounced mool-tee-buss

MassTransit is designed so that most applications only need a single bus, and that is the recommended approach. Using a single bus, with however many receive endpoints are needed, minimizes complexity and ensures efficient broker resource utilization. Consistent with this guidance, container configuration using the AddMassTransit method registers the appropriate types so that they are available to other components, as well as consumers, sagas, and activities.

However, with broader use of cloud-based platforms comes a greater variety of messaging transports, not to mention HTTP as a transfer protocol. As application sophistication increases, connecting to multiple message transports and/or brokers is becoming more common. Therefore, rather than force developers to create their own solutions, MassTransit has the ability to configure additional bus instances within specific dependency injection containers.

And by specific, right now it is very specific: Microsoft.Extensions.DependencyInjection. Though technically, any container that supports IServiceCollection for configuration might work.

Standard Configuration

To review, the configuration for a single bus is shown below.

services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();    x.AddRequestClient<SubmitOrder>();    x.UsingRabbitMq((context, cfg) =>    {        cfg.ConfigureEndpoints(context);    });});

This configures the container so that there is a bus, using RabbitMQ, with a single consumer SubmitOrderConsumer, using automatic endpoint configuration. The MassTransit hosted service, which configures the bus health checks and starts/stop the bus via IHostedService, is also added to the container.

There are several interfaces added to the container using this configuration:

InterfaceLifestyleNotes
IBusControlSingletonUsed to start/stop the bus (not typically used)
IBusSingletonPublish/Send on this bus, starting a new conversation
ISendEndpointProviderScopedSend messages from consumer dependencies, ASP.NET Controllers
IPublishEndpointScopedPublish messages from consumer dependencies, ASP.NET Controllers
IClientFactorySingletonUsed to create request clients (singleton, or within scoped consumers)
IRequestClient<SubmitOrder>ScopedUsed to send requests
ConsumeContextScopedAvailable in any message scope, such as a consumer, saga, or activity

When a consumer, a saga, or an activity is consuming a message the ConsumeContext is available in the container scope. When the consumer is created using the container, the consumer and any dependencies are created within that scope. If a dependency includes ISendEndpointProvider, IPublishEndpoint, or even ConsumeContext (should not be the first choice, but totally okay) on the constructor, all three of those interfaces result in the same reference which is great because it ensures that messages sent and/or published by the consumer or its dependencies includes the proper correlation identifiers and monitoring activity headers.

MultiBus Configuration

To support multiple bus instances in a single container, the interface behaviors described above had to be considered carefully. There are expectations as to how these interfaces behave, and it was important to ensure consistent behavior whether an application has one, two, or a dozen bus instances (please, not a dozen โ€“ think of the children). A way to differentiate between different bus instances ensuring that sent or published messages end up on the right queues or topics is needed. The ability to configure each bus instance separately, yet leverage the power of a single shared container is also a must.

To configure additional bus instances, create a new interface that includes IBus. Then, using that interface, configure the additional bus using the AddMassTransit<T> method, which is included in the MassTransit.MultiBus namespace.

public interface ISecondBus :    IBus{}
services.AddMassTransit(x =>{    x.AddConsumer<SubmitOrderConsumer>();    x.AddRequestClient<SubmitOrder>();    x.UsingRabbitMq((context, cfg) =>    {        cfg.ConfigureEndpoints(context);    });});services.AddMassTransit<ISecondBus>(x =>{    x.AddConsumer<AllocateInventoryConsumer>();    x.AddRequestClient<AllocateInventory>();    x.UsingRabbitMq((context, cfg) =>    {        cfg.Host("remote-host");        cfg.ConfigureEndpoints(context);    });});

This configures the container so that there is an additional bus, using RabbitMQ, with a single consumer AllocateInventoryConsumer, using automatic endpoint configuration. Only a single hosted service is required that will start all bus instances so there is no need to add it twice.

Notable differences in the new method:

  • The generic argument, ISecondBus, is the type that will be added to the container instead of IBus. This ensures that access to the additional bus is directly available without confusion.

The registered interfaces are slightly different for additional bus instances.

InterfaceLifestyleNotes
IBusControlN/ANot registered, but automatically started/stopped by the hosted service
IBusN/ANot registered, the new bus interface is registered instead
ISecondBusSingletonPublish/Send on this bus, starting a new conversation
ISendEndpointProviderScopedSend messages from consumer dependencies only
IPublishEndpointScopedPublish messages from consumer dependencies only
IClientFactoryN/ARegistered as an instance-specific client factory
IRequestClient<SubmitOrder>ScopedCreated using the specific bus instance
ConsumeContextScopedAvailable in any message scope, such as a consumer, saga, or activity

For consumers or dependencies that need to send or publish messages to a different bus instance, a dependency on that specific bus interface (such as IBus, or ISecondBus) would be added.

Some things do not work across bus instances. As stated above, calling Send or Publish on an IBus (or other bus instance interface) starts a new conversation. Middleware components such as the InMemoryOutbox currently do not buffer messages across bus instances.

Bus Interface Types

In the example above, which should be the most common of this hopefully uncommon use, the ISecondBus interface is all that is required. MassTransit creates a dynamic class to delegate the IBus methods to the bus instance. However, it is possible to specify a class that implements ISecondBus instead.

To specify a class, as well as take advantage of the container to bring additional properties along with it, take a look at the following types and configuration.

public interface IThirdBus :    IBus{}class ThirdBus :    BusInstance<IThirdBus>,    IThirdBus{    public ThirdBus(IBusControl busControl, ISomeService someService)        : base(busControl)    {        SomeService = someService;    }    public ISomeService SomeService { get; }}public interface ISomeService{}
services.AddMassTransit<IThirdBus>(x =>{    x.UsingRabbitMq((context, cfg) =>    {        cfg.Host("third-host");    });});

This would add a third bus instance, the same as the second, but using the instance class specified. The class is resolved from the container and given IBusControl, which must be passed to the base class ensuring that it is properly configured.