RabbitMQ Configuration
With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. RabbitMQ is lightweight and easy to deploy on premises and in the cloud. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
MassTransit fully supports RabbitMQ, including many of the advanced features and capabilities.
Minimal Example
In the example below, which configures a receive endpoint, consumer, and message type, the bus is configured to use RabbitMQ.
namespace RabbitMqConsoleListener;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;
public static class Program
{
public static async Task Main(string[] args)
{
await Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
});
});
})
.Build()
.RunAsync();
}
}
Broker Topology
With RabbitMQ, which supports exchanges and queues, messages are sent or published to exchanges and RabbitMQ routes those messages through exchanges to the appropriate queues.
When the bus is started, MassTransit will create exchanges and queues on the virtual host for the receive endpoint. MassTransit creates durable, fanout exchanges by default, and queues are also durable by default.
Configuration
The configuration includes:
- The RabbitMQ host
- Host name:
localhost
- Virtual host:
/
- User name and password used to connect to the virtual host (credentials are virtual-host specific)
- Host name:
- The receive endpoint
- Queue name:
order-events-listener
- Consumer:
OrderSubmittedEventConsumer
- Message type:
OrderSystem.Events.OrderSubmitted
- Message type:
- Queue name:
Name | Description |
---|---|
order-events-listener | Queue for the receive endpoint |
order-events-listener | An exchange, bound to the queue, used to send messages |
OrderSystem.Events:OrderSubmitted | An exchange, named by the message-type, bound to the order-events-listener exchange, used to publish messages |
When a message is sent, the endpoint address can be one of two values:
exchange:order-events-listener
Send the message to the order-events-listener exchange. If the exchange does not exist, it will be created. MassTransit translates topic: to exchange: when using RabbitMQ, so that topic: addresses can be resolved – since RabbitMQ is the only supported transport that doesn't have topics.
queue:order-events-listener
Send the message to the order-events-listener exchange. If the exchange or queue does not exist, they will be created and the exchange will be bound to the queue.
With either address, RabbitMQ will route the message from the order-events-listener exchange to the order-events-listener queue.
When a message is published, the message is sent to the OrderSystem.Events:OrderSubmitted exchange. If the exchange does not exist, it will be created. RabbitMQ will route the message from the OrderSystem.Events:OrderSubmitted exchange to the order-events-listener exchange, and subsequently to the order-events-listener queue. If other receive endpoints connected to the same virtual host include consumers that consume the OrderSubmitted message, a copy of the message would be routed to each of those endpoints as well.
Durable exchanges and queues remain configured on the virtual host, so even if the bus is stopped messages will continue to be routed to the queue. When the bus is restarted, queued messages will be consumed.
Transport Options
All RabbitMQ transport options can be configured using the .Host()
method. The most commonly used settings can be configured via transport options.
services.AddOptions<RabbitMqTransportOptions>()
.Configure(options =>
{
// configure options manually, but usually bind them to a configuration section
});
Property | Type | Description |
---|---|---|
Host | string | Network host name |
Port | ushort | Network port |
ManagementPort | ushort | Management API port |
VHost | string | Virtual host name |
User | string | Username |
Pass | string | Password |
UseSsl | bool | True to use SSL/TLS |
Host Configuration
MassTransit includes several RabbitMQ options that configure the behavior of the entire bus instance.
Property | Type | Description |
---|---|---|
PublisherConfirmation | bool | MassTransit will wait until RabbitMQ confirms messages when publishing or sending messages (default: true) |
Heartbeat | TimeSpan | The heartbeat interval used by the RabbitMQ client to keep the connection alive |
RequestedChannelMax | ushort | The maximum number of channels allowed on the connection |
RequestedConnectionTimeout | TimeSpan | The connection timeout |
ContinuationTimeout | TImeSpan | Sets the time the client will wait for the broker to response to RPC requests. Increase this value if you are experiencing timeouts from RabbitMQ due to a slow broker instance. |
UseCluster
MassTransit can connect to a cluster of RabbitMQ virtual hosts and treat them as a single virtual host. To configure a cluster, call the UseCluster
methods, and add the cluster nodes, each of which becomes part of the virtual host identified by the host name. Each cluster node can specify either a host
or a host:port
combination.
While this exists, it's generally preferable to configure something like HAProxy in front of a RabbitMQ cluster, instead of using MassTransit's built-in cluster configuration.
ConfigureBatchPublish
MassTransit will briefly buffer messages before sending them to RabbitMQ, to increase message throughput. While use of the default values is recommended, the batch options can be configured.
Property | Type | Default | Description |
---|---|---|---|
Enabled | bool | false | Enable or disable batch sends to RabbitMQ |
MessageLimit | int | 100 | Limit the number of messages per batch |
SizeLimit | int | 64K | A rough limit of the total message size |
Timeout | TimeSpan | 1ms | The time to wait for additional messages before sending |
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", h =>
{
h.ConfigureBatchPublish(x =>
{
x.Enabled = true;
x.Timeout = TimeSpan.FromMilliseconds(2);
});
});
});
Endpoint Configuration
MassTransit includes several receive endpoint level configuration options that control receive endpoint behavior.
Property | Type | Description |
---|---|---|
PrefetchCount | ushort | The number of unacknowledged messages that can be processed concurrently (default based on CPU count) |
PurgeOnStartup | bool | Removes all messages from the queue when the bus is started (default: false) |
AutoDelete | bool | If true, the queue will be automatically deleted when the bus is stopped (default: false) |
Durable | bool | If true, messages are persisted to disk before being acknowledged (default: true) |
Quorum Queues
Quorum queues are a more reliable, replicated queue type supported by RabbitMQ. To specify the use of quorum queues, MassTransit can be configured at the individual receive endpoint level or globally using a configure endpoints callback.
To configure a receive endpoint to use a quorum queue:
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("queue-name", e =>
{
e.SetQuorumQueue(3); // replication factor of 3
});
});
To configure all receive endpoints using a configure endpoints callback:
services.AddMassTransit(x =>
{
x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
rmq.SetQuorumQueue(3);
});
});
Additional Examples
CloudAMQP
MassTransit can be used with CloudAMQP, which is a great SaaS-based solution to host your RabbitMQ broker. To configure MassTransit, the host and virtual host must be specified, and UseSsl must be configured.
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("wombat.rmq.cloudamqp.com", 5671, "your_vhost", h =>
{
h.Username("your_vhost");
h.Password("your_password");
h.UseSsl(s =>
{
s.Protocol = SslProtocols.Tls12;
});
});
});
});
AmazonMQ - RabbitMQ
AmazonMQ now includes RabbitMQ support, which means the best message broker can now be used easily on AWS. To configure MassTransit, the AMQPS endpoint address can be used to configure the host as shown below.
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri("amqps://b-12345678-1234-1234-1234-123456789012.mq.us-east-2.amazonaws.com:5671"), h =>
{
h.Username("username");
h.Password("password");
});
});
});