Middleware Filters

Kill Switch

A Kill Switch is used to prevent failing consumers from moving all the messages from the input queue to the error queue. By monitoring message consumption and tracking message successes and failures, a Kill Switch stops the receive endpoint when a trip threshold has been reached.

Typically, consumer exceptions are transient issues and suspending consumption until a later time when the transient issue may have been resolved.

A Kill Switch is the messaging analog of a Circuit Breaker, and operates in a similar manner. However, instead of inducing failure to reduce pressure on a backing service, the kill switch stops consuming messages instead thereby reducing pressure on backing services.

Read Martin Fowler's description of the pattern here.

UseKillSwitch

A Kill Switch can be configured on an individual receive endpoint or all receive endpoints on the bus. To configure a kill switch on all receive endpoints, add the UseKillSwitch method as shown.

cfg.UseKillSwitch(options => options
    .SetActivationThreshold(10)
    .SetTripThreshold(0.15)
    .SetRestartTimeout(m: 1));

In the above example, the kill switch will activate after 10 messages have been consumed. If the ratio of failures/attempts exceeds 15%, the kill switch will trip and stop the receive endpoint. After 1 minute, the receive endpoint will be restarted. Once restarted, if exceptions are still observed, the receive endpoint will be stopped again for 1 minute.

A kill switch may be configured on the bus or on individual receive endpoint(s). When configured on the bus, the kill switch is applied to all receive endpoints.

Options

OptionDescription
TrackingPeriodThe time window for tracking exceptions
TripThresholdThe percentage of failed messages that triggers the kill switch. Should be 0-100, but seriously like 5-10.
ActivationThresholdThe number of messages that must be consumed before the kill switch activates.
RestartTimeoutThe wait time before restarting the receive endpoint
ExceptionFilterBy default, all exceptions are tracked. An exception filter can be configured to only track specific exceptions.

Circuit Breaker

A circuit breaker is used to protect resources (remote, local, or otherwise) from being overloaded when in a failure state. For example, a remote web site may be unavailable and calling that web site in a message consumer takes 30-60 seconds to time out. By continuing to call the failing service, the service may be unable to recover. A circuit breaker detects the repeated failures and trips, preventing further calls to the service and giving it time to recover. Once the reset interval expires, calls are slowly allowed back to the service. If it is still failing, the breaker remains open, and the timeout interval resets. Once the service returns to healthy, calls flow normally as the breaker closes.

Read Martin Fowler's description of the pattern here.

UseCircuitBreaker

To add the circuit breaker to a receive endpoint:

cfg.UseCircuitBreaker(cb =>
{
    cb.TrackingPeriod = TimeSpan.FromMinutes(1);
    cb.TripThreshold = 15;
    cb.ActiveThreshold = 10;
    cb.ResetInterval = TimeSpan.FromMinutes(5);
});

Options

There are four options that can be adjusted on a circuit breaker.

OptionDescription
TrackingPeriodThe time window for tracking exceptions
TripThresholdThis is a percentage, and is based on the ratio of successful to failed attempts. When set to 15, if the ratio exceeds 15%, the circuit breaker opens and remains open until the ResetInterval expires.
ActiveThresholdthat must reach the circuit breaker in a tracking period before the circuit breaker can trip. If set to 10, the trip threshold is not evaluated until at least 10 messages have been received.
ResetIntervalThe period of time between the circuit breaker trip and the first attempt to close the circuit breaker. Messages that reach the circuit breaker during the open period will immediately fail with the same exception that tripped the circuit breaker.

Rate Limiter

A rate limiter is used to restrict the number of messages processed within a time period. The reason may be that an API or service only accepts a certain number of calls per minute, and will delay any subsequent attempts until the rate limiting period has expired.

The rate limiter will delay message delivery until the rate limit expires, so it is best to avoid large time windows and keep the rate limits sane. Something like 1000 over 10 minutes is a bad idea, versus 100 over a minute. Try to adjust the values and see what works for you.

UsePartitioner

To limit concurrent message consumption by partition key on a single bus instance, the partitioner filter can be used. For each message type, a partition key provider must be specified.

To configure the partition key filter, a good example is the job service state machine:

var partition = new Partitioner(16, new Murmur3UnsafeHashGenerator());

e.UsePartitioner<JobSubmitted>(partition, p => p.Message.JobId);
e.UsePartitioner<JobSlotAllocated>(partition, p => p.Message.JobId);

e.UsePartitioner<JobSlotUnavailable>(partition, p => p.Message.JobId);
e.UsePartitioner<Fault<AllocateJobSlot>>(partition, p => p.Message.Message.JobId);

e.UsePartitioner<JobAttemptCreated>(partition, p => p.Message.JobId);
e.UsePartitioner<Fault<StartJobAttempt>>(partition, p => p.Message.Message.JobId);

// ...
This filter does not partition across load balanced consumer instances. If load-balanced, partitioned, in-order message consumption is needed, consider using the SQL Transport.

UseRateLimit

To add a rate limiter to a receive endpoint:

cfg.ReceiveEndpoint("customer_update_queue", e =>
{
    e.UseRateLimit(1000, TimeSpan.FromSeconds(5));
    // other configuration
});

The two arguments supported by the rate limiter include:

RateLimit

The number of calls allowed in the time period.

Interval

The time interval before the message count is reset to zero.

Concurrency Limit

The concurrency limit filter has been deprecated for most scenarios, developers should instead specify a ConcurrentMessageLimit at the bus, endpoint, or consumer level to limit the number of messages processed concurrently.

The concurrency limit filter supports any pipe context (any type that implements PipeContext, which includes most *Context types in MassTransit. For this reason alone the filter still exists in MassTransit despite being deprecated in concurrent message limit scenarios.

UseConcurrencyLimit

To use the concurrency limit filter:

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseConcurrencyLimit(4);

    e.ConfigureConsumer<SubmitOrderConsumer>(context);
});