Routing Slips
Developing applications with a distributed, message-based architecture adds complexity to handling transactions, especially when all steps must either succeed together or fail completely. In traditional applications using an ACID database, transactions are managed through SQL, where partial operations are rolled back if the transaction fails. However, this approach doesn’t scale well when the transaction spans multiple services or databases. In modern microservices architectures, the reliance on a single ACID database has become increasingly rare.
MassTransit Routing Slips address this challenge by enabling distributed transactions with fault compensation, designed to scale across a network of services. It provides functionality previously handled by database transactions, but adapted for distributed systems. Routing Slips also integrate seamlessly with saga state machines, which add capabilities for transaction monitoring and recoverability.
MassTransit implements the Routing Slip pattern, leveraging durable messaging transports and the advanced saga features of MassTransit. Routing slips simplify the coordination of distributed transactions. When combined with a saga state machine, routing slips create a robust, recoverable, and maintainable approach to message processing across multiple services.
Beyond basic routing slip functionality, MassTransit also supports compensations, allowing activities to store execution data so that reversible operations can be undone. Compensation can be achieved either through traditional rollbacks or by performing offsetting operations. For instance, an activity that reserves a seat for a customer could release that reservation if compensation is triggered.
Activities
In a MassTransit routing slip, an Activity refers to a processing step that can be added to a routing slip.
Compensating
To create an activity, create a class that implements the IActivity<TArguments, TLog>
interface for activities that support compensation or IExecuteActivity<TArguments>
for those that don't need compensation.
public class DownloadImageActivity :
IActivity<DownloadImageArguments, DownloadImageLog>
{
Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context);
Task<CompensationResult> Compensate(CompensateContext<DownloadImageLog> context);
}
The IActivity<TArguments, TLog>
interface has two generic arguments. The first specifies the activity’s argument type and the second specifies the activity’s log type. In the example above, DownloadImageArguments
is the argument type and DownloadImageLog
is the log type. The type parameters may be an interface, class or record type. Where the type is a class or a record, the proper accessors should be specified (i.e. { get; set; }
or { get; init; }
).
Non-Compensating
public class DownloadImageActivity :
IExecuteActivity<DownloadImageArguments>
{
Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context);
}
Implementation
Verb | Description |
---|---|
Execute | The primary action that the activity performs as part of the workflow. |
Compensate | The action the activity must take to undo or reverse its effects if any subsequent activities fail. |
Execute
Both IActivity
and IExecuteActivity
require you to implement the Execute
method. Execute is called while the routing slip is executing activities.
When Execute is called, the ExecuteContext
argument contains the activity arguments, the routing slip's TrackingNumber, and methods to complete or fault the activity. The actual routing slip message, as well as any details of the underlying infrastructure, are excluded to prevent coupling between the activity and the implementation. An example Execute method is shown below.
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
return execution.Completed<DownloadImageLog>(new {ImageSavePath = imageSavePath});
}
Execution Results
After an activity finishes processing, it returns an ExecutionResult to the host. If the activity completes successfully, it can choose to store compensation data in an activity log, which is passed to the Completed method on the ExecuteContext
argument. If no compensation data is needed, the activity log is optional. In addition to compensation data, the activity can also add or modify variables stored in the routing slip for use by subsequent activities.
Result | Description |
---|---|
Complete | Indicates the activity completed successfully. |
Fault | Indicates the activity failed. |
Terminate | Indicates the routing slip should stop, but the process is considered complete. |
Completing
Complete
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
// success with no compensation log
return execution.Completed();
}
Complete - With Compensation Log
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
return execution.Completed<DownloadImageLog>(new {ImageSavePath = imageSavePath});
}
In the example above, the activity specifies the DownloadImageLog interface and initializes the log using an anonymous object. The object is then passed to the Completed method for storage in the routing slip before sending the routing slip to the next activity.
Revise Itinerary
An activity has complete control over the routing slip and can revise the itinerary to include additional activities.
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
return execution.ReviseItinerary(builder =>
{
// add activity at the beginning of the current itinerary
builder.AddActivity("Deviation", new Uri($"exchange:{optionalAddress}"));
// maintain the existing activities
builder.AddActivitiesFromSourceItinerary();
// add activity at the end of the current itinerary
builder.AddActivity("Deviation", new Uri($"exchange:{optionalAddress}"));
});
}
Faulting
By default, if an activity throws an exception, it will be faulted and a RoutingSlipFaulted
event will be published (unless a subscription changes the rules). An activity can also return Faulted rather than throwing an exception.
Throwing an Exception
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
// will throw an exception
var result = 100 / 0;
return execution.Completed();
}
In the example above, the activity will throw an exception which will result in a Fault
which will include data about the exception.
Explicitly
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
DownloadImageArguments args = execution.Arguments;
string imageSavePath = Path.Combine(args.WorkPath,
execution.TrackingNumber.ToString());
await _httpClient.GetAndSave(args.ImageUri, imageSavePath);
return execution.Faulted();
}
In the example above, the activity can look at the data and then explicitly return a Faulted
result.
Terminating
In some situations, it may make sense to terminate the routing slip without executing any of the subsequent activities in the itinerary. This might be due to a business rule, in which the routing slip shouldn't be faulted, but needs to end immediately.
To terminate a routing slip, call Terminate as shown.
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
// regular termination
return execution.Terminate();
}
An optional reason can also be specified.
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
// terminate and include additional variables in the event
return execution.Terminate(new { Reason = "Not a good time, dude."});
}
Compensating
Only IActivity
requires you to implement the Compensate
method. Compensate is called when a subsequent activity has faulted so that the activity can undo or reverse its effects.
When an activity fails, the Compensate method is called for previously executed activities in the routing slip that stored compensation data. If an activity does not store any compensation data, the Compensate method is never called.
return context.Completed(new LogModel {
SomeData = "abc"
})
The compensation method for the example above is shown below.
Task<CompensationResult> Compensate(CompensateContext<DownloadImageLog> compensation)
{
DownloadImageLog log = compensation.Log;
File.Delete(log.ImageSavePath);
return compensation.Compensated();
}
Using the activity log data, the activity compensates by removing the downloaded image from the work directory. Once the activity has successfully compensated for the previous execution, it returns a CompensationResult by calling the Compensated method. If the compensation cannot be completed (due to logic issues or exceptions) and this results in a failure, the Failed method should be used, optionally providing an Exception.
Building a Routing Slip
A routing slip defines a sequence of processing steps, called activities, that are combined into a single itinerary. As each activity finishes, the routing slip is forwarded to the next activity in the itinerary. When all activities are completed, the routing slip is finalized, marking the transaction as complete.
One key advantage of using a routing slip is its flexibility—activities can vary for each transaction. Depending on factors like payment methods, billing/shipping address, or customer preferences, the routing slip builder can dynamically add activities. This is in contrast to the more rigid, predefined behavior of a state machine or sequential workflow, which are statically defined through code, a DSL, or frameworks like Windows Workflow.
Routing Slip Structure
A routing slip contains an itinerary, variables, and activity/compensation logs. It is defined by a message contract, which the underlying Courier components use to execute and compensate the transaction. The routing slip contract includes:
- A unique tracking number for each routing slip
- An itinerary, which is an ordered list of activities
- An activity log, recording an ordered list of previously executed activities
- A compensation log, listing previously executed activities that can be compensated if the routing slip faults
- A collection of variables, which can be mapped to activity arguments
- A collection of subscriptions for notifying consumers of routing slip events
- A collection of exceptions that may have occurred during routing slip execution
Routing Slip Builder
Instead of directly implementing the RoutingSlip message type, developers are encouraged to use a RoutingSlipBuilder to construct the routing slip. The RoutingSlipBuilder simplifies the process by providing methods to add activities (and their arguments), activity logs, and variables to the routing slip. For example, to create a routing slip with two activities and an additional variable, a developer might write:
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"),
new
{
ImageUri = new Uri("http://images.google.com/someImage.jpg")
});
builder.AddActivity("FilterImage", new Uri("rabbitmq://localhost/execute_filterimage"));
builder.AddVariable("WorkPath", @"\dfs\work");
var routingSlip = builder.Build();
Each activity requires a name for display purposes and a URI specifying the execution address. The execution address is where the routing slip should be sent to execute the activity. For each activity, arguments can be specified that are stored and presented to the activity via the activity arguments interface type specify by the first argument of the IActivity interface. The activities added to the routing slip are combined into an Itinerary, which is the list of activities to be executed, and stored in the routing slip.
Managing the inventory of available activities, as well as their names and execution addresses, is the responsibility of the application and is not part of the MassTransit Courier. Since activities are application specific, and the business logic to determine which activities to execute and in what order is part of the application domain, the details are left to the application developer.
Activity Arguments
Each activity declares an activity argument type, which must be an interface. When the routing slip is received by an activity host, the argument type is used to read data from the routing slip and deliver it to the activity.
The argument properties are mapped, by name, to the argument type from the routing slip using:
- Explicitly declared arguments, added to the itinerary with the activity
- Implicitly mapped arguments, added as variables to the routing slip
To specify an explicit activity argument, specify the argument value while adding the activity using the routing slip builder.
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"), new
{
ImageUri = new Uri("http://images.google.com/someImage.jpg")
});
To specify an implicit activity argument, add a variable to the routing slip with the same name/type as the activity argument.
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"));
builder.AddVariable("ImageUri", "http://images.google.com/someImage.jpg");
If an activity argument is not specified when the routing slip is created, it may be added by an activity that executes prior to the activity that requires the argument. For instance, if the DownloadImage activity stored the image in a local cache, that address could be added and used by another activity to access the cached image.
First, the routing slip would be built without the argument value.
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"));
builder.AddActivity("ProcessImage", new Uri("rabbitmq://localhost/execute_processimage"));
builder.AddVariable("ImageUri", "http://images.google.com/someImage.jpg");
Then, the first activity would add the variable to the routing slip on completion.
async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context)
{
...
return context.CompletedWithVariables(new { ImagePath = ...});
}
The process image activity would then use that variable as an argument value.
async Task<ExecutionResult> Execute(ExecuteContext<ProcessImageArguments> context)
{
var path = context.Arguments.ImagePath;
}
Executing
Once built, the routing slip is executed, which sends it to the first activity’s execute URI. To make it easy and to ensure that source information is included, an extension method on IBus is available, the usage of which is shown below.
await bus.Execute(routingSlip);
It should be pointed out that if the address for the first activity is invalid or cannot be reached, an exception will be thrown by the Execute method.
Routing Slip Events
During routing slip execution, events are published when the routing slip completes or faults. Every event message includes the TrackingNumber as well as a Timestamp (in UTC, of course) indicating when the event occurred:
- RoutingSlipCompleted
- RoutingSlipFaulted
- RoutingSlipCompensationFailed
Additional events are published for each activity, including:
- RoutingSlipActivityCompleted
- RoutingSlipActivityFaulted
- RoutingSlipActivityCompensated
- RoutingSlipActivityCompensationFailed
By observing these events, an application can monitor and track the state of a routing slip. To maintain the current state, an Automatonymous state machine could be created. To maintain history, events could be stored in a database and then queried using the TrackingNumber of the routing slip.
Subscriptions
By default, routing slip events are published -- which means that any subscribed consumers will receive the events. While this is useful getting started, it can quickly get out of control as applications grow and multiple unrelated routing slips are used. To handle this, subscriptions were added (yes, added, because they weren't though of until we experienced this ourselves).
Subscriptions are added to the routing slip at the time it is built using the RoutingSlipBuilder
.
builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"),
RoutingSlipEvents.All);
This subscription would send all routing slip events to the specified endpoint. If the application only wanted specified events, the events can be selected by specifying the enumeration values for those events. For example, to only get the RoutingSlipCompleted
and RoutingSlipFaulted
events, the following code would be used.
builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"),
RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted);
It is also possible to tweak the content of the events to cut down on message size. For instance, by default, the RoutingSlipCompleted
event includes the variables from the routing slip. If the variables contained a large document, that document would be copied to the event. Eliminating the variables from the event would reduce the message size, thereby reducing the traffic on the message broker. To specify the contents of a routing slip event subscription, an additional argument is specified.
builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"),
RoutingSlipEvents.Completed, RoutingSlipEventContents.None);
This would send the RoutingSlipCompleted
event to the endpoint, without any of the variables be included (only the main properties of the event would be present).
Once a subscription is added to a routing slip, events are no longer published -- they are only sent to the addresses specified in the subscriptions. However, multiple subscriptions can be specified -- the endpoints just need to be known at the time the routing slip is built.
Custom
It is also possible to specify a subscription with a custom event, a message that is created by the application developer. This makes it possible to create your own event types and publish them in response to routing slip events occurring. And this includes having the full context of a regular endpoint Send
so that any headers or context settings can be applied.
To create a custom event subscription, use the overload shown below.
// first, define the event type in your assembly
public record OrderProcessingCompleted
{
public Guid TrackingNumber { get; init; }
public DateTime Timestamp { get; init; }
public string OrderId { get; init; }
public string OrderApproval { get; init; }
}
// then, add the subscription with the custom properties
builder.AddSubscription(new Uri("rabbitmq://localhost/order-events"),
RoutingSlipEvents.Completed,
x => x.Send<OrderProcessingCompleted>(new
{
OrderId = "BFG-9000",
OrderApproval = "ComeGetSome"
}));
In the message contract above, there are four properties, but only two of them are specified. By default, the base RoutingSlipCompleted
event is created, and then the content of that event is merged into the message created in the subscription. This ensures that the dynamic values, such as the TrackingNumber
and the Timestamp
, which are present in the default event, are available in the custom event.
Custom events can also select with contents are merged with the custom event, using an additional method overload.