Azure Service Bus Transport
Azure Service Bus is a messaging service from Microsoft Azure that allows for communication between decoupled systems. It offers a reliable and secure platform for asynchronous transfer of data and state. It supports a variety of messaging patterns, including queuing, publish/subscribe, and request/response.
With Service Bus, you can create messaging entities such as queues, topics, and subscriptions. Queues provide one-to-one messaging, where each message is consumed by a single receiver. Topics and subscriptions provide one-to-many messaging, where a message is delivered to multiple subscribers.
Service Bus also provides advanced features such as partitioning and auto-scaling, which allow for high availability and scalability. Additionally, it offers a dead letter queue, which is a special queue that stores undelivered or expired messages.
Topology
The send and publish topologies are extended to support the Azure Service Bus features, and make it possible to configure how topics are created.
Topics
An Azure Service Bus Topic is a messaging entity that allows for one-to-many messaging, where a message is delivered to multiple subscribers. Topics are built on top of Azure Service Bus Queues and provide additional functionality for publish/subscribe messaging patterns.
When a message is sent to a topic, it is automatically broadcast to all subscribers that have a subscription to that topic. Subscriptions are used to filter messages that are delivered to the subscribers. Subscribers can create multiple subscriptions to a topic, each with its own filter, to receive only the messages that are of interest to them.
Topics also provide a feature called Session-based messaging, which allows for guaranteed ordering of messages, and the ability to send and receive messages in a stateful manner.
Topics provide a robust and scalable messaging infrastructure for building distributed systems, where multiple services or systems can subscribe to a topic and receive messages that are relevant to them. Topics also support advanced features such as partitioning and auto-scaling, which allow for high availability and scalability.
To specify properties used when a topic is created, the publish topology can be configured during bus creation:
cfg.Publish<OrderSubmitted>(x =>
{
x.EnablePartitioning = true;
});
PartitionKey
When publishing messages to an Azure Service Bus topic, you can use the PartitionKey property to specify a value that will be used to partition the messages across multiple topic partitions. This can be useful in situations where you want to ensure that related messages are always delivered to the same partition, and thus will be guaranteed to be processed in the order they were sent.
By setting a PartitionKey, all messages with the same key will be sent to the same partition, and thus will be received by consumers in the order they were sent. This is particularly useful when building distributed systems that require strict ordering of messages, such as event sourcing or stream processing.
Another use case for the PartitionKey is when you have a large number of messages and want to distribute them evenly across multiple partitions for better performance, this way the messages are load balanced across all the partitions.
It's important to note that when you use a PartitionKey, it's important to choose a key that will result in an even distribution of messages across partitions, to avoid overloading a single partition.
The PartitionKey on published/sent messages can be configured by convention, allowing the same method to be used for messages which implement a common interface type. If no common type is shared, each message type may be configured individually using various conventional selectors. Alternatively, developers may create their own convention to fit their needs.
When configuring a bus, the send topology can be used to specify a routing key formatter for a particular message type.
public record SubmitOrder
{
public string CustomerId { get; init; }
public Guid TransactionId { get; init; }
}
cfg.Send<SubmitOrder>(x =>
{
x.UsePartitionKeyFormatter(context => context.Message.CustomerId);
});
SessionId
When publishing messages to an Azure Service Bus Topic, you can use the SessionId property to specify a value that will be used to group messages together in a session. This can be useful in situations where you want to ensure that related messages are always delivered together, and thus will be guaranteed to be processed in the order they were sent.
A session is a logical container for messages, and all messages within a session have a guaranteed order of delivery. This means that messages with the same SessionId will be delivered in the order they were sent, regardless of the order they were received by the topic.
A common use case for sessions is when you have a set of related messages that need to be processed together. For example, if you are sending a series of commands to control a device, you would want to ensure that the commands are delivered in the order they were sent and that all related commands are delivered together.
Another use case for sessions is when you have a large number of messages and want to ensure that each consumer processes the messages in a specific order.
It's important to note that when you use sessions, the consumers must be able to process the messages in the order they were sent, otherwise messages might get stuck in the session and cause delays.
The SessionId on published/sent messages can be configured by convention, allowing the same method to be used for messages which implement a common interface type. If no common type is shared, each message type may be configured individually using various conventional selectors. Alternatively, developers may create their own convention to fit their needs.
When configuring a bus, the send topology can be used to specify a routing key formatter for a particular message type.
public record UpdateUserStatus
{
public Guid UserId { get; init; }
public string Status { get; init; }
}
cfg.Send<UpdateUserStatus>(x =>
{
x.UseSessionIdFormatter(context => context.Message.UserId);
});
Subscriptions
In Azure, topics and topic subscriptions provide a mechanism for one-to-many communication (versus queues that are designed for one-to-one). A topic subscription acts as a virtual queue. To subscribe to a topic subscription directly the SubscriptionEndpoint
should be used:
cfg.SubscriptionEndpoint<MessageType>("subscription-name", e =>
{
e.ConfigureConsumer<MyConsumer>(provider);
})
Note that a topic subscription's messages can be forwarded to a receive endpoint (an Azure Service Bus queue), in the following way. Behind the scenes MassTransit is setting up Service Bus Auto-forwarding between a topic subscription and a queue.
cfg.ReceiveEndpoint("input-queue", e =>
{
e.Subscribe("topic-name");
e.Subscribe<MessageType>();
})
The properties of the topic subscription may also be configured:
cfg.ReceiveEndpoint("input-queue", e =>
{
e.Subscribe("topic-name", x =>
{
x.AutoDeleteOnIdle = TimeSpan.FromMinutes(60);
});
})
Subscription Filters
MassTransit supports the configuration of subscription rules and filters, which can be used to filter messages as they are delivered to either the subscription endpoint or forwarded to the receive endpoint.
To specify a subscription filter:
cfg.ReceiveEndpoint("input-queue", e =>
{
e.Subscribe("topic-name", x =>
{
x.Filter = new SqlRuleFilter("1 = 1");
});
})
Saga State Machine Event Filter
This is an advanced scenario in which a saga state machine has an event that needs to filter messages from the topic via the subscription.
First, configure the event, which is defined in the saga state machine, so that it does not configure the consume topology.
public class FilteredSagaStateMachine :
MassTransitStateMachine<FilteredSaga>
{
public FilteredSagaStateMachine()
{
Event(() => FilteredEvent, x => x.ConfigureConsumeTopology = false);
}
public Event<Filtered> FilteredEvent { get; }
}
Note that this may cause the saga state machine to be difficult to unit test, since events will no longer be automatically routed to the saga's receive endpoint.
Next, add a saga definition for the saga and explicitly subscribe to the event type
public class FilteredSagaDefinition :
SagaDefinition<FilteredSaga>
{
protected virtual void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator,
ISagaConfigurator<FilteredSaga> sagaConfigurator)
{
if(endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.Subscribe<Filtered>("subscription-name", x =>
{
x.Rule = new CreateRuleOptions("Only47", new SqlRuleFilter("ClientId = 47"));
});
}
}
}
Finally, add the saga state machine and the definition when configuring MassTransit.
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<FilteredSagaStateMachine, FilteredSaga, FilteredSagaDefinition>();
});
Broker Topology
Two commands and events are used in this example.
These are the event contracts for a consumer that receives files from a customer:
namespace Acme;
public interface FileReceived
{
Guid FileId { get; }
DateTime Timestamp { get; }
Uri Location { get; }
}
public interface CustomerDataReceived
{
DateTime Timestamp { get; }
string CustomerId { get; }
string SourceAddress { get; }
Uri Location { get; }
}
Here is the command contract for processing a file that was received.
namespace Acme;
public interface ProcessFile
{
Guid FileId { get; }
Uri Location { get; }
}
The above contracts are used by the consumers to receive messages. From a publishing or sending perspective, two classes are created by the event producer and the command sender which implement these interfaces.
namespace Acme;
public record FileReceivedEvent :
FileReceived,
CustomerDataReceived
{
public Guid FileId { get; init; }
public DateTime Timestamp { get; init; }
public Uri Location { get; init; }
public string CustomerId { get; init; }
public string SourceAddress { get; init; }
}
And the command class:
namespace Acme;
public record ProcessFileCommand :
ProcessFile
{
public Guid FileId { get; init; }
public Uri Location { get; init; }
}
The consumers for these message contracts are shown below:
namespace Acme;
class FileReceivedConsumer :
IConsumer<FileReceived>
{
}
class CustomerAuditConsumer :
IConsumer<CustomerDataReceived>
{
}
class ProcessFileConsumer :
IConsumer<ProcessFile>
{
}
Send
These are the topics and queues for the example above when Sending a message:
Publish
These are the topics and queues for the example above when Publishing a polymorphic message that uses inheritance:
Fault
These are the topics and queues used when messages fail. The failing message gets forwarded to an _error
queue by default. The following diagram shows which topics and queues are used when a message fails to be processed and is deadlettered for the example above.
Go to Exceptions to learn more on exception and faults
Retrying messages
The Azure Service Bus Portal provides a method to retry faulted messages by doing the following:
- Open the Service Bus namespace
- Select the queue that has failed messages
- Select 'Service Bus Explorer'
- Select the 'Dead-letter' tab
This will open a view of the dead-letter queue and provides an option to select one or more messages. The selected messages can be retried by selecting Re-send selected messages