Middleware Filters
Kill Switch
A Kill Switch is used to prevent failing consumers from moving all the messages from the input queue to the error queue. By monitoring message consumption and tracking message successes and failures, a Kill Switch stops the receive endpoint when a trip threshold has been reached.
Typically, consumer exceptions are transient issues and suspending consumption until a later time when the transient issue may have been resolved.
Read Martin Fowler's description of the pattern here.
UseKillSwitch
A Kill Switch can be configured on an individual receive endpoint or all receive endpoints on the bus. To configure a kill switch on all receive endpoints, add the UseKillSwitch method as shown.
cfg.UseKillSwitch(options => options
.SetActivationThreshold(10)
.SetTripThreshold(0.15)
.SetRestartTimeout(m: 1));
In the above example, the kill switch will activate after 10 messages have been consumed. If the ratio of failures/attempts exceeds 15%, the kill switch will trip and stop the receive endpoint. After 1 minute, the receive endpoint will be restarted. Once restarted, if exceptions are still observed, the receive endpoint will be stopped again for 1 minute.
A kill switch may be configured on the bus or on individual receive endpoint(s). When configured on the bus, the kill switch is applied to all receive endpoints.
Options
Option | Description |
---|---|
TrackingPeriod | The time window for tracking exceptions |
TripThreshold | The percentage of failed messages that triggers the kill switch. Should be 0-100, but seriously like 5-10. |
ActivationThreshold | The number of messages that must be consumed before the kill switch activates. |
RestartTimeout | The wait time before restarting the receive endpoint |
ExceptionFilter | By default, all exceptions are tracked. An exception filter can be configured to only track specific exceptions. |
Circuit Breaker
A circuit breaker is used to protect resources (remote, local, or otherwise) from being overloaded when in a failure state. For example, a remote web site may be unavailable and calling that web site in a message consumer takes 30-60 seconds to time out. By continuing to call the failing service, the service may be unable to recover. A circuit breaker detects the repeated failures and trips, preventing further calls to the service and giving it time to recover. Once the reset interval expires, calls are slowly allowed back to the service. If it is still failing, the breaker remains open, and the timeout interval resets. Once the service returns to healthy, calls flow normally as the breaker closes.
Read Martin Fowler's description of the pattern here.
UseCircuitBreaker
To add the circuit breaker to a receive endpoint:
cfg.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
Options
There are four options that can be adjusted on a circuit breaker.
Option | Description |
---|---|
TrackingPeriod | The time window for tracking exceptions |
TripThreshold | This is a percentage, and is based on the ratio of successful to failed attempts. When set to 15, if the ratio exceeds 15%, the circuit breaker opens and remains open until the ResetInterval expires. |
ActiveThreshold | that must reach the circuit breaker in a tracking period before the circuit breaker can trip. If set to 10, the trip threshold is not evaluated until at least 10 messages have been received. |
ResetInterval | The period of time between the circuit breaker trip and the first attempt to close the circuit breaker. Messages that reach the circuit breaker during the open period will immediately fail with the same exception that tripped the circuit breaker. |
Rate Limiter
A rate limiter is used to restrict the number of messages processed within a time period. The reason may be that an API or service only accepts a certain number of calls per minute, and will delay any subsequent attempts until the rate limiting period has expired.
UsePartitioner
To limit concurrent message consumption by partition key on a single bus instance, the partitioner filter can be used. For each message type, a partition key provider must be specified.
To configure the partition key filter, a good example is the job service state machine:
var partition = new Partitioner(16, new Murmur3UnsafeHashGenerator());
e.UsePartitioner<JobSubmitted>(partition, p => p.Message.JobId);
e.UsePartitioner<JobSlotAllocated>(partition, p => p.Message.JobId);
e.UsePartitioner<JobSlotUnavailable>(partition, p => p.Message.JobId);
e.UsePartitioner<Fault<AllocateJobSlot>>(partition, p => p.Message.Message.JobId);
e.UsePartitioner<JobAttemptCreated>(partition, p => p.Message.JobId);
e.UsePartitioner<Fault<StartJobAttempt>>(partition, p => p.Message.Message.JobId);
// ...
UseRateLimit
To add a rate limiter to a receive endpoint:
cfg.ReceiveEndpoint("customer_update_queue", e =>
{
e.UseRateLimit(1000, TimeSpan.FromSeconds(5));
// other configuration
});
The two arguments supported by the rate limiter include:
RateLimit
The number of calls allowed in the time period.
Interval
The time interval before the message count is reset to zero.
Concurrency Limit
ConcurrentMessageLimit
at the bus, endpoint, or consumer level to limit the number of messages processed concurrently.The concurrency limit filter supports any pipe context (any type that implements PipeContext
, which includes most *Context
types in MassTransit. For this reason alone the filter still exists in MassTransit despite being deprecated in concurrent message limit scenarios.
UseConcurrencyLimit
To use the concurrency limit filter:
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseConcurrencyLimit(4);
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});