Consumers
To understand consumers and how to create one, refer to the Consumers section.
High-level concepts covered in this configuration section include:
Concept | Description |
---|---|
Consumer | A class that consumes one or more messages types, one for each implementation of IConsumer<T> |
Batch Consumer | A class that consumes multiple messages in batches, by implementing IConsumer<Batch<T>> |
Job Consumer | A class that consumes a job message, specified by the IJobConsumer<T> interface |
Consumer Definition | A class, derived from ConsumerDefinition<TConsumer> that configures settings and the consumer's receive endpoint |
Receive Endpoint | Receives messages from a broker queue and delivers those messages to consumer types configured on the receive endpoint |
Consumers can be added many ways allowing either a simple of fine-grained approach to registration. Consumers are added inside the AddMassTransit
configuration, but before the transport.
using MassTransit;services.AddMassTransit(x =>{ x.AddConsumer<MyConsumer>(); x.Using[Transport]((context, cfg) => { // transport, middleware, other configuration cfg.ConfigureEndpoints(context); });});
Adding Consumers
Adds a single consumer, with all defaults
AddConsumer<MyConsumer>();AddConsumer(typeof(MyConsumer));
Adds a consumer, with a consumer definition.
AddConsumer<MyConsumer, MyConsumerDefinition>();AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));
Adds a consumer with a matching consumer definition and configures the consumer pipeline.
AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>{ cfg.ConcurrentMessageLimit = 8;});
Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.
void AddConsumers(params Type[] types);
Adds all consumers and consumer definitions in the specified an assembly or assemblies.
void AddConsumers(params Assembly[] assemblies);
Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.
void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);
Batch Options
If you want your consumer to process multiple messages at a time, you can configure a Batch Consumer
. This
is a consumer that implements IConsumer<Batch<TMessage>>
.
AddConsumer<MyBatchConsumer>(cfg =>{ cfg.Options<BatchOptions>(options => options .SetMessageLimit(100) .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .GroupBy<MyMessage>(x => x.CustomerId) .SetConcurrencyLimit(10));});
Property | Type | Default | Description |
---|---|---|---|
MessageLimit | int | 10 | Max number of messages in a batch |
ConcurrencyLimit | int | 1 | number of concurrent batches |
TimeLimit | TimeSpan | 1 sec | maximum time to wait before delivering a partial batch |
TimeLimitStart | TimeSpan | From First | starting point |
GroupKeyProvider | object? | null | the property to group by |
Job Options
If your consumer needs to work for an extended period of time, greater than a second, you may want to register the consumer as a job consumer. You can read more about this feature in the Job Consumer pattern section.
AddConsumer<MyJobConsumer>(cfg =>{ cfg.Options<JobOptions<MyJob>>(options => options .SetMessageLimit(100) .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .GroupBy<MyMessage>(x => x.CustomerId) .SetConcurrencyLimit(10));});
Property | Type | Default | Description |
---|---|---|---|
JobTimeout | TimeSpan | 5 minutes | Maximum time the job is allowed to run |
ConcurrentJobLimit | int | 1 | Number of concurrent executing jobs |
RetryPolicy | IRetryPolicy | None | How should failures be retried, if at all |
Retry Policies
- None: No retries
- Immediate: retry N times, with an optional exception filter
- Intervals: retry N times, with a pause between and an optional exception filter
- Incremental: retry N times, with an increasing pause between and an optional exception filter
- Exponential: retry N times, with an ever increasing pause between and an optional exception filter
Configuring Endpoints
By default MassTransit requires no explicit configuration of endpoints, and can be created
automatically by calling ConfigureEndpoints
. You can customize this behavior using ConsumerDefinition
or by specifying the endpoint configuration inline.
using MassTransit;services.AddMassTransit(x =>{ // Step 1: Add Consumers Here // Step 2: Select a Transport x.Using[Transport]((context, cfg) => { // Step 3: Configure the Transport // Step 4: Configure Endpoints // All consumers registered in step 1, will get // default endpoints created. cfg.ConfigureEndpoints(context); });});
Customized Endpoints
To manually configure a consumer on a receive endpoint, use one of the following methods. You may want to do this for the following reasons.
- Group Consumers onto a specific queue, vs the default of one queue per consumer
cfg.ReceiveEndpoint("manually-configured", e =>{ // configure endpoint-specific settings first e.SomeEndpointSetting = someValue; // configure any required middleware components next e.UseMessageRetry(r => r.Interval(5, 1000)); // configure the consumer last e.ConfigureConsumer<MyConsumer>(context);});// configure any remaining consumers, sagas, etc.cfg.ConfigureEndpoints(context);
Endpoint Configuration is Custom by Transport
Consumer Configuration
ConfigureConsumer<T>(context);
Configures the consumer on the receive endpoint.
ConfigureConsumer<T>(context, consumer => { // configure consumer-specific middleware});
Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.
ConfigureConsumers(context);
Configures all consumers that haven't been configured on the receive endpoint.