RabbitMQ Configuration

alt MassTransit on NuGet

With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. RabbitMQ is lightweight and easy to deploy on premises and in the cloud. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

MassTransit fully supports RabbitMQ, including many of the advanced features and capabilities.

To get started with RabbitMQ, refer to the configuration section which uses RabbitMQ in the examples.

Minimal Example

In the example below, which configures a receive endpoint, consumer, and message type, the bus is configured to use RabbitMQ.

namespace RabbitMqConsoleListener;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;

public static class Program
{
    public static async Task Main(string[] args)
    {
        await Host.CreateDefaultBuilder(args)
            .ConfigureServices(services =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingRabbitMq((context, cfg) =>
                    {
                        cfg.Host("localhost", "/", h =>
                        {
                            h.Username("guest");
                            h.Password("guest");
                        });
                    });
                });
            })
            .Build()
            .RunAsync();
    }
}

Broker Topology

With RabbitMQ, which supports exchanges and queues, messages are sent or published to exchanges and RabbitMQ routes those messages through exchanges to the appropriate queues.

When the bus is started, MassTransit will create exchanges and queues on the virtual host for the receive endpoint. MassTransit creates durable, fanout exchanges by default, and queues are also durable by default.

Configuration

The configuration includes:

  • The RabbitMQ host
    • Host name: localhost
    • Virtual host: /
    • User name and password used to connect to the virtual host (credentials are virtual-host specific)
  • The receive endpoint
    • Queue name: order-events-listener
    • Consumer: OrderSubmittedEventConsumer
      • Message type: OrderSystem.Events.OrderSubmitted
NameDescription
order-events-listenerQueue for the receive endpoint
order-events-listenerAn exchange, bound to the queue, used to send messages
OrderSystem.Events:OrderSubmittedAn exchange, named by the message-type, bound to the order-events-listener exchange, used to publish messages

When a message is sent, the endpoint address can be one of two values:

exchange:order-events-listener

Send the message to the order-events-listener exchange. If the exchange does not exist, it will be created. MassTransit translates topic: to exchange: when using RabbitMQ, so that topic: addresses can be resolved – since RabbitMQ is the only supported transport that doesn't have topics.

queue:order-events-listener

Send the message to the order-events-listener exchange. If the exchange or queue does not exist, they will be created and the exchange will be bound to the queue.

With either address, RabbitMQ will route the message from the order-events-listener exchange to the order-events-listener queue.

When a message is published, the message is sent to the OrderSystem.Events:OrderSubmitted exchange. If the exchange does not exist, it will be created. RabbitMQ will route the message from the OrderSystem.Events:OrderSubmitted exchange to the order-events-listener exchange, and subsequently to the order-events-listener queue. If other receive endpoints connected to the same virtual host include consumers that consume the OrderSubmitted message, a copy of the message would be routed to each of those endpoints as well.

If a message is published before starting the bus, so that MassTransit can create the exchanges and queues, the exchange OrderSystem.Events:OrderSubmitted will be created. However, until the bus has been started at least once, there won't be a queue bound to the exchange and any published messages will be lost. Once the bus has been started, the queue will remain bound to the exchange even when the bus is stopped.

Durable exchanges and queues remain configured on the virtual host, so even if the bus is stopped messages will continue to be routed to the queue. When the bus is restarted, queued messages will be consumed.

Transport Options

All RabbitMQ transport options can be configured using the .Host() method. The most commonly used settings can be configured via transport options.

services.AddOptions<RabbitMqTransportOptions>()
    .Configure(options =>
    {
        // configure options manually, but usually bind them to a configuration section
    });
PropertyTypeDescription
HoststringNetwork host name
PortushortNetwork port
ManagementPortushortManagement API port
VHoststringVirtual host name
UserstringUsername
PassstringPassword
UseSslboolTrue to use SSL/TLS

Host Configuration

MassTransit includes several RabbitMQ options that configure the behavior of the entire bus instance.

PropertyTypeDescription
PublisherConfirmationboolMassTransit will wait until RabbitMQ confirms messages when publishing or sending messages (default: true)
HeartbeatTimeSpanThe heartbeat interval used by the RabbitMQ client to keep the connection alive
RequestedChannelMaxushortThe maximum number of channels allowed on the connection
RequestedConnectionTimeoutTimeSpanThe connection timeout
ContinuationTimeoutTImeSpanSets the time the client will wait for the broker to response to RPC requests. Increase this value if you are experiencing timeouts from RabbitMQ due to a slow broker instance.

UseCluster

MassTransit can connect to a cluster of RabbitMQ virtual hosts and treat them as a single virtual host. To configure a cluster, call the UseCluster methods, and add the cluster nodes, each of which becomes part of the virtual host identified by the host name. Each cluster node can specify either a host or a host:port combination.

While this exists, it's generally preferable to configure something like HAProxy in front of a RabbitMQ cluster, instead of using MassTransit's built-in cluster configuration.

ConfigureBatchPublish

Note this was deprecated in v8.3.2 with the update to the new RabbitMQ.Client v7

MassTransit will briefly buffer messages before sending them to RabbitMQ, to increase message throughput. While use of the default values is recommended, the batch options can be configured.

PropertyTypeDefaultDescription
EnabledboolfalseEnable or disable batch sends to RabbitMQ
MessageLimitint100Limit the number of messages per batch
SizeLimitint64KA rough limit of the total message size
TimeoutTimeSpan1msThe time to wait for additional messages before sending
x.UsingRabbitMq((context, cfg) =>
{
    cfg.Host("localhost", h =>
    {
        h.ConfigureBatchPublish(x =>
        {
            x.Enabled = true;
            x.Timeout = TimeSpan.FromMilliseconds(2);
        });
    });
});

Endpoint Configuration

MassTransit includes several receive endpoint level configuration options that control receive endpoint behavior.

PropertyTypeDescription
PrefetchCountushortThe number of unacknowledged messages that can be processed concurrently (default based on CPU count)
PurgeOnStartupboolRemoves all messages from the queue when the bus is started (default: false)
AutoDeleteboolIf true, the queue will be automatically deleted when the bus is stopped (default: false)
DurableboolIf true, messages are persisted to disk before being acknowledged (default: true)

Quorum Queues

Quorum queues are a more reliable, replicated queue type supported by RabbitMQ. To specify the use of quorum queues, MassTransit can be configured at the individual receive endpoint level or globally using a configure endpoints callback.

To configure a receive endpoint to use a quorum queue:

x.UsingRabbitMq((context, cfg) =>
{
    cfg.ReceiveEndpoint("queue-name", e =>
    {
        e.SetQuorumQueue(3); // replication factor of 3
    });
});

To configure all receive endpoints using a configure endpoints callback:

services.AddMassTransit(x =>
{
    x.AddConfigureEndpointsCallback((name, cfg) =>
    {
        if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
            rmq.SetQuorumQueue(3);        
    });
});

Reply To Request Client

New in MassTransit v8.3.0

RabbitMQ provides a default ReplyTo address for every broker connection that can be used to send messages directly to the connection without the need to create a temporary queue. MassTransit supports use of the ReplyTo address for the request client.

By default, MassTransit will create a non-durable, auto-delete queue for the bus and use that queue for responses sent to the request client.

To configure MassTransit to use the ReplyTo address instead of the bus endpoint:

services.AddMassTransit(x =>
{
    x.SetRabbitMqReplyToRequestClientFactory();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

This will replace the default request client factory with the RabbitMQ-specific ReplyTo client factory, allowing responses to requests sent using the request client to to delivered using the broker connection's ReplyTo address.

Additional Examples

CloudAMQP

MassTransit can be used with CloudAMQP, which is a great SaaS-based solution to host your RabbitMQ broker. To configure MassTransit, the host and virtual host must be specified, and UseSsl must be configured.

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("wombat.rmq.cloudamqp.com", 5671, "your_vhost", h =>
        {
            h.Username("your_vhost");
            h.Password("your_password");

            h.UseSsl(s =>
            {
                s.Protocol = SslProtocols.Tls12;
            });
        });
    });
});

AmazonMQ - RabbitMQ

AmazonMQ now includes RabbitMQ support, which means the best message broker can now be used easily on AWS. To configure MassTransit, the AMQPS endpoint address can be used to configure the host as shown below.

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri("amqps://b-12345678-1234-1234-1234-123456789012.mq.us-east-2.amazonaws.com:5671"), h =>
        {
            h.Username("username");
            h.Password("password");
        });
    });
});