Transactional Outbox Configuration
The Transaction Outbox is explained in the patterns section. This section covers how to configure the transactional outbox using any of the supported databases.
Bus Outbox Options
The bus outbox has its own configuration settings, which are common across all supported databases.
Setting | Description |
---|---|
MessageDeliveryLimit | The number of messages to deliver at a time from the outbox to the broker |
MessageDeliveryTimeout | Transport Send timeout when delivering messages to the transport |
DisableDeliveryService() | Disable the outbox message delivery service, removing the hosted service from the service collection |
Entity Framework Outbox
The Transactional Outbox for Entity Framework Core uses three tables in the DbContext
to store messages that are subsequently delivered to the message broker.
Table | Description |
---|---|
InboxState | Tracks received messages by MessageId for each endpoint |
OutboxMessage | Stores messages published or sent using ConsumeContext , IPublishEndpoint , and ISendEndpointProvider |
OutboxState | Tracks delivery of outbox messages by the delivery service (similar to InboxState but for message sent outside of a consumer via the bus outbox) |
Configuration
The code below is based upon the sample application
The outbox components are included in the MassTransit.EntityFrameworkCore
NuGet packages. The code below configures both the bus outbox and the consumer outbox using the default settings. In this case, PostgreSQL is the database engine.
x.AddEntityFrameworkOutbox<RegistrationDbContext>(o =>
{
// configure which database lock provider to use (Postgres, SqlServer, or MySql)
o.UsePostgres();
// enable the bus outbox
o.UseBusOutbox();
});
To configure the DbContext with the appropriate tables, use the extension methods shown below:
public class RegistrationDbContext :
DbContext
{
public RegistrationDbContext(DbContextOptions<RegistrationDbContext> options)
: base(options)
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
}
}
To configure the outbox on a receive endpoint, configure the receive endpoint as shown below. The configuration below uses a SagaDefinition
to configure the receive endpoint, which is added to MassTransit along with the saga state machine.
public class RegistrationStateDefinition :
SagaDefinition<RegistrationState>
{
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator,
ISagaConfigurator<RegistrationState> consumerConfigurator, IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 500, 1000, 1000, 1000, 1000, 1000));
endpointConfigurator.UseEntityFrameworkOutbox<RegistrationDbContext>(context);
}
}
The definition is added with the saga state machine:
x.AddSagaStateMachine<RegistrationStateMachine, RegistrationState, RegistrationStateDefinition>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<RegistrationDbContext>();
r.UsePostgres();
});
The Entity Framework outbox adds a hosted service which removes delivered InboxState entries after the DuplicateDetectionWindow has elapsed. The Bus Outbox includes an additional hosted service that delivers the outbox messages to the broker.
The outbox can also be added to all consumers using a configure endpoints callback:
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseEntityFrameworkOutbox<RegistrationDbContext>(context);
});
Configuration Options
The available outbox settings are listed below.
Setting | Description |
---|---|
DuplicateDetectionWindow | The amount of time a message remains in the inbox for duplicate detection (based on MessageId) |
IsolationLevel | The transaction isolation level to use (Serializable by default) |
LockStatementProvider | The lock statement provider, needed to execute pessimistic locks. Is set via UsePostgres , UseSqlServer (the default), or UseMySql |
QueryDelay | The delay between queries once messages are no longer available. When a query returns messages, subsequent queries are performed until no messages are returned after which the QueryDelay is used. |
QueryMessageLimit | The maximum number of messages to query from the database at a time |
QueryTimeout | The database query timeout |
MongoDB Outbox
Configuration
The code below is based upon the sample application
The outbox components are included in the MassTransit.MongoDb
NuGet packages. The code below configures both the bus outbox and the consumer outbox using the default settings.
x.AddMongoDbOutbox(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(1);
o.ClientFactory(provider => provider.GetRequiredService<IMongoClient>());
o.DatabaseFactory(provider => provider.GetRequiredService<IMongoDatabase>());
o.DuplicateDetectionWindow = TimeSpan.FromSeconds(30);
o.UseBusOutbox();
});
To configure the transactional outbox for a specific consumer, use a consumer definition:
public class ValidateRegistrationConsumerDefinition :
ConsumerDefinition<ValidateRegistrationConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ValidateRegistrationConsumer> consumerConfigurator, IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(10, 50, 100, 1000, 1000, 1000, 1000, 1000));
endpointConfigurator.UseMongoDbOutbox(context);
}
}
To configure the transactional outbox for all configured receive endpoints, use a configure endpoints callback:
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseMongoDbOutbox(context);
});