MultiBus

pronounced mool-tee-buss

MassTransit is designed so that most applications only need a single bus, and that is the recommended approach. Using a single bus, with however many receive endpoints are needed, minimizes complexity and ensures efficient broker resource utilization. Consistent with this guidance, container configuration using the AddMassTransit method registers the appropriate types so that they are available to other components, as well as consumers, sagas, and activities.

However, with broader use of cloud-based platforms comes a greater variety of messaging transports, not to mention HTTP as a transfer protocol. As application sophistication increases, connecting to multiple message transports and/or brokers is becoming more common. Therefore, rather than force developers to create their own solutions, MassTransit has the ability to configure additional bus instances within specific dependency injection containers.

And by specific, right now it is very specific: Microsoft.Extensions.DependencyInjection. Though technically, any container that supports IServiceCollection for configuration might work.

Standard Configuration

To review, the configuration for a single bus is shown below.

services.AddMassTransit(x =>
{
    x.AddConsumer<SubmitOrderConsumer>();
    x.AddRequestClient<SubmitOrder>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

This configures the container so that there is a bus, using RabbitMQ, with a single consumer SubmitOrderConsumer, using automatic endpoint configuration. The MassTransit hosted service, which configures the bus health checks and starts/stop the bus via IHostedService, is also added to the container.

When a consumer, a saga, or an activity is consuming a message the ConsumeContext is available in the container scope. When the consumer is created using the container, the consumer and any dependencies are created within that scope. If a dependency includes ISendEndpointProvider, IPublishEndpoint, or even ConsumeContext (should not be the first choice, but totally okay) on the constructor, all three of those interfaces result in the same reference which is great because it ensures that messages sent and/or published by the consumer or its dependencies includes the proper correlation identifiers and monitoring activity headers.

MultiBus Configuration

To support multiple bus instances in a single container, the interface behaviors described above had to be considered carefully. There are expectations as to how these interfaces behave, and it was important to ensure consistent behavior whether an application has one, two, or a dozen bus instances (please, not a dozen โ€“ think of the children). A way to differentiate between different bus instances ensuring that sent or published messages end up on the right queues or topics is needed. The ability to configure each bus instance separately, yet leverage the power of a single shared container is also a must.

To configure additional bus instances, create a new interface that includes IBus. Then, using that interface, configure the additional bus using the AddMassTransit<T> method, which is included in the MassTransit.MultiBus namespace.

public interface ISecondBus :
    IBus
{
}
services.AddMassTransit(x =>
{
    x.AddConsumer<SubmitOrderConsumer>();
    x.AddRequestClient<SubmitOrder>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

services.AddMassTransit<ISecondBus>(x =>
{
    x.AddConsumer<AllocateInventoryConsumer>();
    x.AddRequestClient<AllocateInventory>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("remote-host");

        cfg.ConfigureEndpoints(context);
    });
});

This configures the container so that there is an additional bus, using RabbitMQ, with a single consumer AllocateInventoryConsumer, using automatic endpoint configuration. Only a single hosted service is required that will start all bus instances so there is no need to add it twice.

Notable differences in the new method:

  • The generic argument, ISecondBus, is the type that will be added to the container instead of IBus. This ensures that access to the additional bus is directly available without confusion.

Using your MultiBus

For consumers or dependencies that need to send or publish messages to a different bus instance, a dependency on that specific bus interface (such as IBus, or ISecondBus) would be added.

Some things do not work across bus instances. As stated above, calling Send or Publish on an IBus (or other bus instance interface) starts a new conversation. Middleware components such as the InMemoryOutbox currently do not buffer messages across bus instances.

Controller

[ApiController]
[Route("/inventory")]
public class InventoryController : ControllerBase
{
    readonly Bind<ISecondBus, IPublishEndpoint> _publishEndpoint;

    public InventoryController(Bind<ISecondBus, IPublishEndpoint> publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    [HttpPost]
    public async Task<IActionResult> Post() 
    {
        // .. do stuff
    }
}

Razor Page

public class InventoryPage : PageModel
{
    public void OnPost([FromServices] Bind<ISecondBus, IPublishEndpoint> publishEndpoint)
    {
        await publishEndpoint.Value.Publish<AllocateInventory>(new 
        {
            SomeData = { }
        })
    }
}

Advanced Bus Types

In the example above, which should be the most common of this hopefully uncommon use, the ISecondBus interface is all that is required. MassTransit creates a dynamic class to delegate the IBus methods to the bus instance. However, it is possible to specify a class that implements ISecondBus instead.

To specify a class, as well as take advantage of the container to bring additional properties along with it, take a look at the following types and configuration.

public interface IThirdBus :
    IBus
{
}

class ThirdBus :
    BusInstance<IThirdBus>,
    IThirdBus
{
    public ThirdBus(IBusControl busControl, ISomeService someService)
        : base(busControl)
    {
        SomeService = someService;
    }

    public ISomeService SomeService { get; }
}

public interface ISomeService
{
}
services.AddMassTransit<IThirdBus>(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("third-host");
    });
});

This would add a third bus instance, the same as the second, but using the instance class specified. The class is resolved from the container and given IBusControl, which must be passed to the base class ensuring that it is properly configured.

Container Registration Details

To support a first class experience with Microsoft.Extensions.DependencyInjection MassTransit registers common interfaces for MultiBus instances using a Bind<TKey, TValue> that allows you to specify the owner of the type you are interested in. This allows you to access various pieces of MassTransit outside of a Consumer. Below are two tables that list out the various items you might be interested in.

First Bus

There are several interfaces added to the container using this configuration:

InterfaceLifestyleNotes
IBusControlSingletonUsed to start/stop the bus (not typically used)
IBusSingletonPublish/Send on this bus, starting a new conversation
ISendEndpointProviderScopedSend messages from consumer dependencies, ASP.NET Controllers
IPublishEndpointScopedPublish messages from consumer dependencies, ASP.NET Controllers
IClientFactorySingletonUsed to create request clients (singleton, or within scoped consumers)
IRequestClient<SubmitOrder>ScopedUsed to send requests
ConsumeContextScopedAvailable in any message scope, such as a consumer, saga, or activity

Multibus

The registered interfaces are slightly different for additional bus instances.

InterfaceLifestyleNotes
IBusControlN/ANot registered, but automatically started/stopped by the hosted service
IBusN/ANot registered, the new bus interface is registered instead
ISecondBusSingletonPublish/Send on this bus, starting a new conversation
ISendEndpointProviderScopedSend messages from consumer dependencies only
IPublishEndpointScopedPublish messages from consumer dependencies only
IClientFactoryN/ARegistered as an instance-specific client factory
IRequestClient<SubmitOrder>ScopedCreated using the specific bus instance
ConsumeContextScopedAvailable in any message scope, such as a consumer, saga, or activity
Bind<ISecondBus, ISendEndpointProvider>ScopedSend messages from controllers or outside of a consumer context
Bind<ISecondBus, IPublishEndpoint>ScopedPublish messages from controllers or outside of a consumer context
Bind<ISecondBus, IClientFactory>ScopedRegistered as an instance-specific client factory
Bind<ISecondBus, IRequestClient<SubmitOrder>>ScopedCreated using the bound bus instance