Consumers
To understand consumers and how to create one, refer to the Consumers section.
High-level concepts covered in this configuration section include:
| Concept | Description |
|---|---|
| Consumer | A class that consumes one or more messages types, one for each implementation of IConsumer<TMessage> |
| Batch Consumer | A class that consumes multiple messages in batches, by implementing IConsumer<Batch<TMessage>> |
| Job Consumer | A class that consumes a job message, specified by the IJobConsumer<TJob> interface |
| Consumer Definition | A class, derived from ConsumerDefinition<TConsumer> that configures settings and the consumer's receive endpoint |
| Receive Endpoint | Receives messages from a broker queue and delivers those messages to consumer types configured on the receive endpoint |
Consumers can be added many ways allowing either a simple of fine-grained approach to registration. Consumers are added inside the AddMassTransit
configuration, but before the transport.
using MassTransit;
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.Using[Transport]((context, cfg) =>
{
// transport, middleware, other configuration
cfg.ConfigureEndpoints(context);
});
});
Adding Consumers
Adds a single consumer, with all defaults
AddConsumer<MyConsumer>();
AddConsumer(typeof(MyConsumer));
Adds a consumer, with a consumer definition.
AddConsumer<MyConsumer, MyConsumerDefinition>();
AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));
Adds a consumer with a matching consumer definition and configures the consumer pipeline.
AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>
{
cfg.ConcurrentMessageLimit = 8;
});
Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.
void AddConsumers(params Type[] types);
Adds all consumers and consumer definitions in the specified an assembly or assemblies.
void AddConsumers(params Assembly[] assemblies);
Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.
void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);
Batch Options
If you want your consumer to process multiple messages at a time, you can configure a Batch Consumer. This is a consumer that implements
IConsumer<Batch<TMessage>>.
AddConsumer<MyBatchConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
| Property | Type | Default | Description |
|---|---|---|---|
| MessageLimit | int | 10 | Max number of messages in a batch |
| ConcurrencyLimit | int | 1 | number of concurrent batches |
| TimeLimit | TimeSpan | 1 sec | maximum time to wait before delivering a partial batch |
| TimeLimitStart | TimeSpan | From First | starting point |
| GroupKeyProvider | object? | null | the property to group by |
Job Options
If your consumer needs to work for an extended period of time, greater than a second, you may want to register the consumer as a job consumer. You can read more about this feature in the Job Consumer pattern section.
AddConsumer<MyJobConsumer>(cfg =>
{
cfg.Options<JobOptions<MyJob>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10)
.SetRetry(r => r.Interval(5,30000)));
});
| Property | Type | Default | Description |
|---|---|---|---|
| JobTimeout | TimeSpan | 5 minutes | Maximum time the job is allowed to run |
| ConcurrentJobLimit | int | 1 | Number of concurrent executing jobs |
| RetryPolicy | IRetryPolicy | None | How should failures be retried, if at all |
| JobTypeName | string | Job Type | Override the default job type name used in the JobTypeSaga table (display one) |
Retry Policies
- None: No retries
- Immediate: retry N times, with an optional exception filter
- Intervals: retry N times, with a pause between and an optional exception filter
- Incremental: retry N times, with an increasing pause between and an optional exception filter
- Exponential: retry N times, with an ever increasing pause between and an optional exception filter
Configuring Endpoints
By default MassTransit requires no explicit configuration of endpoints, and can be created
automatically by calling ConfigureEndpoints. You can customize this behavior using ConsumerDefinition
or by specifying the endpoint configuration inline.
using MassTransit;
services.AddMassTransit(x =>
{
// Step 1: Add Consumers Here
// Step 2: Select a Transport
x.Using[Transport]((context, cfg) => {
// Step 3: Configure the Transport
// Step 4: Configure Endpoints
// All consumers registered in step 1, will get
// default endpoints created.
cfg.ConfigureEndpoints(context);
});
});
Customized Endpoints
To manually configure a consumer on a receive endpoint, use one of the following methods. You may want to do this for the following reasons.
- Group Consumers onto a specific queue, vs the default of one queue per consumer
cfg.ReceiveEndpoint("manually-configured", e =>
{
// configure endpoint-specific settings first
e.SomeEndpointSetting = someValue;
// configure any required middleware components next
e.UseMessageRetry(r => r.Interval(5, 1000));
// configure the consumer last
e.ConfigureConsumer<MyConsumer>(context);
});
// configure any remaining consumers, sagas, etc.
cfg.ConfigureEndpoints(context);
Endpoint Configuration is Custom by Transport
Consumer Configuration
ConfigureConsumer<T>(context);
Configures the consumer on the receive endpoint.
ConfigureConsumer<T>(context, consumer =>
{
// configure consumer-specific middleware
});
Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.
ConfigureConsumers(context);
Configures all consumers that haven't been configured on the receive endpoint.
Consumer Definitions
Inside a consumer definition you can control all of the definitions about a consumer and its associated endpoint.
public class SubmitOrderConsumerDefinition :
ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
// override the default endpoint name, for whatever reason
EndpointName = "ha-submit-order";
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 4;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DiscoveryPingConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
Endpoint Options
| Concept | Type | Description |
|---|---|---|
| EndpointDefinition | IEndpointDefinition<T> | ?? |
| EndpointName | string | the name of the queue that will be generated |
Consumer Options
| Concept | Type | Description |
|---|---|---|
| ConcurrentMessageLimit | int | the number of messages THIS consumer can process concurrently |