Consumer Sagas
A consumer saga is a class, identified by a CorrelationId
, that defines the state persisted by a saga repository. Along with the state, interfaces may be added to the saga class to define the events handled by the saga. This combination of state and behavior in a single class is a consumer saga. In the example below, an order saga initiated by a SubmitOrder message is defined.
Interfaces
InitiatedBy
public record SubmitOrder :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime OrderDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
SubmitDate = context.Message.OrderDate;
}
}
When a SubmitOrder
message is received by the saga's receive endpoint, the CorrelationId
property is used to determine if an existing saga instance with that CorrelationId exists. If an existing instance is not found, the repository creates a new saga instance and calls the Consume method on the new instance. After the Consume method completes, the repository saves the newly created instance.
Orchestrates
To define an event orchestrated by an existing saga instance, such as OrderAccepted, an additional interface and method is specified.
public record OrderAccepted :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context)
{
AcceptDate = context.Message.Timestamp;
}
}
InitiatedByOrOrchestrates
To define an event that can initiate a new or orchestrate an existing saga instance, such as OrderInvoiced, an additional interface and method is specified.
public record OrderInvoiced :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
public decimal Amount { get; init; }
}
public class OrderPaymentSaga :
ISaga,
InitiatedByOrOrchestrates<OrderInvoiced>
{
public Guid CorrelationId { get; set; }
public DateTime? InvoiceDate { get; set; }
public decimal? Amount { get; set; }
public async Task Consume(ConsumeContext<OrderInvoiced> context)
{
InvoiceDate = context.Message.Timestamp;
Amount = context.Message.Amount;
}
}
Observes
To define an event observed by an existing saga instance that does not implement the CorrelatedBy
public record OrderShipped
{
public Guid OrderId { get; init; }
public DateTime ShipDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
Observes<OrderShipped, OrderSaga>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public DateTime? ShipDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context) {...}
public async Task Consume(ConsumeContext<OrderShipped> context)
{
ShipDate = context.Message.ShipDate;
}
public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
(saga,message) => saga.CorrelationId == message.OrderId;
}
Configuration
To add a saga when configuring MassTransit, use the AddSaga method shown below.
services.AddMassTransit(x =>
{
x.AddSaga<OrderSaga>()
.InMemoryRepository();
});
Supported saga repositories are documented in the configuration section.