RabbitMQ Transport

RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP). It is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover.

RabbitMQ can be used to decouple and distribute systems by sending messages between them. It supports a variety of messaging patterns, including point-to-point, publish/subscribe, and request/response.

RabbitMQ provides features such as routing, reliable delivery, and message persistence. It also has a built-in management interface that allows for monitoring and management of the broker, queues, and connections. Additionally, it supports various plugins, such as the RabbitMQ Management Plugin, that provide additional functionality.

Topology

The send and publish topologies are extended to support RabbitMQ features, and make it possible to configure how exchanges are created.

Exchanges

In RabbitMQ, an exchange is a component that receives messages from producers and routes them to one or more queues based on a set of rules called bindings. Exchanges are used to decouple the producer of a message from the consumer, by allowing messages to be sent to multiple queues and/or consumers.

There are several types of exchanges in RabbitMQ, each with its own routing algorithm:

Exchange TypeRouting Algorithm
Direct exchangeroute messages to queues based on an exact match of the routing key
Fanout exchangeroute messages to all bound queues
Topic exchangeroute messages to queues based on a pattern match of the routing key
Headers exchangeroute messages to queues based on the headers of the message

When a message is published to an exchange, the exchange applies the routing algorithm based on the routing key and the bindings to determine which queues the message should be sent to. The message is then sent to each of the queues that it matches.

Exchanges allow for more complex routing and message distribution strategies, as they allow to route messages based on different criteria, such as routing key, headers, or patterns.

When a message is published, MassTransit sends it to an exchange that is named based upon the message type. Using topology, the exchange name, as well as the exchange properties can be configured to support a custom behavior.

To configure the properties used when an exchange is created, the publish topology can be configured during bus creation:

cfg.Publish<OrderSubmitted>(x =>
{
    x.Durable = false; // default: true
    x.AutoDelete = true; // default: false
    x.ExchangeType = "fanout"; // default, allows any valid exchange type
});

cfg.Publish<OrderEvent>(x =>
{
    x.Exclude = true; // do not create an exchange for this type
});

Exchange Binding

To bind an exchange to a receive endpoint:

cfg.ReceiveEndpoint("input-queue", e =>
{
    e.Bind("exchange-name");
    e.Bind<MessageType>();
})

The above will create two exchange bindings, one between the exchange-name exchange and the input-queue exchange and a second between the exchange name matching the MessageType and the same input-queue exchange.

The properties of the exchange binding may also be configured:

cfg.ReceiveEndpoint("input-queue", e =>
{
    e.Bind("exchange-name", x =>
    {
        x.Durable = false;
        x.AutoDelete = true;
        x.ExchangeType = "direct";
        x.RoutingKey = "8675309";
    });
})

The above will create an exchange binding between the exchange-name and the input-queue exchange, using the configured properties.

RoutingKey

The routing key on published/sent messages can be configured by convention, allowing the same method to be used for messages which implement a common interface type. If no common type is shared, each message type may be configured individually using various conventional selectors. Alternatively, developers may create their own convention to fit their needs.

When configuring a bus, the send topology can be used to specify a routing key formatter for a particular message type.

public record SubmitOrder
{
    public string CustomerType { get; init; }
    public Guid TransactionId { get; init; }
    // ...
}
cfg.Send<SubmitOrder>(x =>
{
    // use customerType for the routing key
    x.UseRoutingKeyFormatter(context => context.Message.CustomerType);

    // multiple conventions can be set, in this case also CorrelationId
    x.UseCorrelationId(context => context.Message.TransactionId);
});

// Keeping in mind that the default exchange config for your published type will be the full typename of your message
// we explicitly specify which exchange the message will be published to. So it lines up with the exchange we are binding our
// consumers too.
cfg.Message<SubmitOrder>(x => x.SetEntityName("submitorder"));

// Also if your publishing your message: because publishing a message will, by default, send it to a fanout queue.
// We specify that we are sending it to a direct queue instead. In order for the routingkeys to take effect.
cfg.Publish<SubmitOrder>(x => x.ExchangeType = ExchangeType.Direct);

The consumer could then be created:

public class OrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {

    }
}

And then connected to a receive endpoint:

cfg.ReceiveEndpoint("priority-orders", x =>
{
    x.ConfigureConsumeTopology = false;

    x.Consumer<OrderConsumer>();

    x.Bind("submitorder", s => 
    {
        s.RoutingKey = "PRIORITY";
        s.ExchangeType = ExchangeType.Direct;
    });
});

cfg.ReceiveEndpoint("regular-orders", x =>
{
    x.ConfigureConsumeTopology = false;

    x.Consumer<OrderConsumer>();

    x.Bind("submitorder", s => 
    {
        s.RoutingKey = "REGULAR";
        s.ExchangeType = ExchangeType.Direct;
    });
});

This would split the messages sent to the exchange, by routing key, to the proper endpoint, using the CustomerType property.

Endpoint Address

A RabbitMQ endpoint address supports the following query string parameters:

ParameterTypeDescriptionImplies
temporaryboolTemporary endpointdurable = false, autodelete = true
durableboolSave messages to disk
autodeleteboolDelete when bus is stopped
bindboolBind exchange to queue
queuestringBind to queue namebind = true
typestringExchange type (fanout, direct, topic)
delayedtypestring(Internal) delayed target exchange typetype = x-delayed-message
alternateexchangestringAlternate exchange name
bindexchangestringBind additional exchangeQueues Only
singleactiveconsumerbool(Internal) Receive endpoint has a single active consumerQueues Only

Broker Topology

In this example topology, two commands and events are used.

First, the event contracts that are supported by an endpoint that receives files from a customer.

public interface FileReceived
{
    Guid FileId { get; }
    DateTime Timestamp { get; }
    Uri Location { get; }
}

public interface CustomerDataReceived
{
    DateTime Timestamp { get; }
    string CustomerId { get; }
    string SourceAddress { get; }
    Uri Location { get; }
}

Second, the command contract for processing a file that was received.

public interface ProcessFile
{
    Guid FileId { get; }
    Uri Location { get; }
}

The above contracts are used by the consumers to receive messages. From a publishing or sending perspective, two classes are created by the event producer and the command sender which implement these interfaces.

public record FileReceivedEvent :
    FileReceived,
    CustomerDataReceived
{
    public Guid FileId { get; init; }
    public DateTime Timestamp { get; init; }
    public Uri Location { get; init; }
    public string CustomerId { get; init; }
    public string SourceAddress { get; init; }
}

And the command class.

public record ProcessFileCommand :
    ProcessFile
{
    public Guid FileId { get; init; }   
    public Uri Location { get; init; }
}

The consumers for these message contracts are as below.

class FileReceivedConsumer :
    IConsumer<FileReceived>
{
}

class CustomerAuditConsumer :
    IConsumer<CustomerDataReceived>
{
}

class ProcessFileConsumer :
    IConsumer<ProcessFile>
{
}

Publish

The exchanges and queues configures for the event example are as shown below.

MassTransit publishes messages to the message type exchange, and copies are routed to all the subscribers by RabbitMQ. This approach was based on an article on how to maximize routing performance in RabbitMQ.

rabbitmq-publish-topology

Send

The exchanges and queues for the send example are shown.

rabbitmq-send-topology

Note that the broker topology can now be configured using the topology API.