RabbitMQ Transport
RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP). It is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover.
RabbitMQ can be used to decouple and distribute systems by sending messages between them. It supports a variety of messaging patterns, including point-to-point, publish/subscribe, and request/response.
RabbitMQ provides features such as routing, reliable delivery, and message persistence. It also has a built-in management interface that allows for monitoring and management of the broker, queues, and connections. Additionally, it supports various plugins, such as the RabbitMQ Management Plugin, that provide additional functionality.
Topology
The send and publish topologies are extended to support RabbitMQ features, and make it possible to configure how exchanges are created.
Exchanges
In RabbitMQ, an exchange is a component that receives messages from producers and routes them to one or more queues based on a set of rules called bindings. Exchanges are used to decouple the producer of a message from the consumer, by allowing messages to be sent to multiple queues and/or consumers.
There are several types of exchanges in RabbitMQ, each with its own routing algorithm:
Exchange Type | Routing Algorithm |
---|---|
Direct exchange | route messages to queues based on an exact match of the routing key |
Fanout exchange | route messages to all bound queues |
Topic exchange | route messages to queues based on a pattern match of the routing key |
Headers exchange | route messages to queues based on the headers of the message |
When a message is published to an exchange, the exchange applies the routing algorithm based on the routing key and the bindings to determine which queues the message should be sent to. The message is then sent to each of the queues that it matches.
Exchanges allow for more complex routing and message distribution strategies, as they allow to route messages based on different criteria, such as routing key, headers, or patterns.
When a message is published, MassTransit sends it to an exchange that is named based upon the message type. Using topology, the exchange name, as well as the exchange properties can be configured to support a custom behavior.
To configure the properties used when an exchange is created, the publish topology can be configured during bus creation:
cfg.Publish<OrderSubmitted>(x =>
{
x.Durable = false; // default: true
x.AutoDelete = true; // default: false
x.ExchangeType = "fanout"; // default, allows any valid exchange type
});
cfg.Publish<OrderEvent>(x =>
{
x.Exclude = true; // do not create an exchange for this type
});
Exchange Binding
To bind an exchange to a receive endpoint:
cfg.ReceiveEndpoint("input-queue", e =>
{
e.Bind("exchange-name");
e.Bind<MessageType>();
})
The above will create two exchange bindings, one between the exchange-name
exchange and the input-queue
exchange and a second between the exchange name
matching the MessageType
and the same input-queue
exchange.
The properties of the exchange binding may also be configured:
cfg.ReceiveEndpoint("input-queue", e =>
{
e.Bind("exchange-name", x =>
{
x.Durable = false;
x.AutoDelete = true;
x.ExchangeType = "direct";
x.RoutingKey = "8675309";
});
})
The above will create an exchange binding between the exchange-name
and the input-queue
exchange, using the configured properties.
RoutingKey
The routing key on published/sent messages can be configured by convention, allowing the same method to be used for messages which implement a common interface type. If no common type is shared, each message type may be configured individually using various conventional selectors. Alternatively, developers may create their own convention to fit their needs.
When configuring a bus, the send topology can be used to specify a routing key formatter for a particular message type.
public record SubmitOrder
{
public string CustomerType { get; init; }
public Guid TransactionId { get; init; }
// ...
}
cfg.Send<SubmitOrder>(x =>
{
// use customerType for the routing key
x.UseRoutingKeyFormatter(context => context.Message.CustomerType);
// multiple conventions can be set, in this case also CorrelationId
x.UseCorrelationId(context => context.Message.TransactionId);
});
// Keeping in mind that the default exchange config for your published type will be the full typename of your message
// we explicitly specify which exchange the message will be published to. So it lines up with the exchange we are binding our
// consumers too.
cfg.Message<SubmitOrder>(x => x.SetEntityName("submitorder"));
// Also if your publishing your message: because publishing a message will, by default, send it to a fanout queue.
// We specify that we are sending it to a direct queue instead. In order for the routingkeys to take effect.
cfg.Publish<SubmitOrder>(x => x.ExchangeType = ExchangeType.Direct);
The consumer could then be created:
public class OrderConsumer :
IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
}
}
And then connected to a receive endpoint:
cfg.ReceiveEndpoint("priority-orders", x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<OrderConsumer>();
x.Bind("submitorder", s =>
{
s.RoutingKey = "PRIORITY";
s.ExchangeType = ExchangeType.Direct;
});
});
cfg.ReceiveEndpoint("regular-orders", x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<OrderConsumer>();
x.Bind("submitorder", s =>
{
s.RoutingKey = "REGULAR";
s.ExchangeType = ExchangeType.Direct;
});
});
This would split the messages sent to the exchange, by routing key, to the proper endpoint, using the CustomerType property.
Endpoint Address
A RabbitMQ endpoint address supports the following query string parameters:
Parameter | Type | Description | Implies |
---|---|---|---|
temporary | bool | Temporary endpoint | durable = false, autodelete = true |
durable | bool | Save messages to disk | |
autodelete | bool | Delete when bus is stopped | |
bind | bool | Bind exchange to queue | |
queue | string | Bind to queue name | bind = true |
type | string | Exchange type (fanout, direct, topic) | |
delayedtype | string | (Internal) delayed target exchange type | type = x-delayed-message |
alternateexchange | string | Alternate exchange name | |
bindexchange | string | Bind additional exchange | Queues Only |
singleactiveconsumer | bool | (Internal) Receive endpoint has a single active consumer | Queues Only |
Broker Topology
In this example topology, two commands and events are used.
First, the event contracts that are supported by an endpoint that receives files from a customer.
namespace Acme;
public interface FileReceived
{
Guid FileId { get; }
DateTime Timestamp { get; }
Uri Location { get; }
}
public interface CustomerDataReceived
{
DateTime Timestamp { get; }
string CustomerId { get; }
string SourceAddress { get; }
Uri Location { get; }
}
Second, the command contract for processing a file that was received.
namespace Acme;
public interface ProcessFile
{
Guid FileId { get; }
Uri Location { get; }
}
The above contracts are used by the consumers to receive messages. From a publishing or sending perspective, two classes are created by the event producer and the command sender which implement these interfaces.
namespace Acme;
public record FileReceivedEvent :
FileReceived,
CustomerDataReceived
{
public Guid FileId { get; init; }
public DateTime Timestamp { get; init; }
public Uri Location { get; init; }
public string CustomerId { get; init; }
public string SourceAddress { get; init; }
}
And the command class.
namespace Acme;
public record ProcessFileCommand :
ProcessFile
{
public Guid FileId { get; init; }
public Uri Location { get; init; }
}
The consumers for these message contracts are as below.
class FileReceivedConsumer :
IConsumer<FileReceived>
{
}
class CustomerAuditConsumer :
IConsumer<CustomerDataReceived>
{
}
class ProcessFileConsumer :
IConsumer<ProcessFile>
{
}
Publish
These are the exchanges and queues for the example above showing the topology for a Publish of a polymorphic message that uses inheritance:
Send
These are the exchanges and queues for the example above showing the topology for a Send:
Fault
These are the exchanges and queues used when messages fail. The failing message gets forwarded to an _error
queue by default. The following diagram shows which Exchanges and Queues are used when a message fails to be processed and is deadlettered for the example above.
Go to Exceptions to learn more on exception and faults
Retrying messages
The RabbitMQ Management UI has the ability to retry faulted messages when used in conjunction with the Shovel plugin. Faulted messages by default end up in the *_error
queue that corresponds with the consumer queue.
Before returning a message the message can be inspected by first fetching a message via the button Get Message(s)
which returns a raw view of the message properties and payload:
After inspection, all messsages currently stored in the error queue can be 'shoveled' back the the original queue.