Consumers
To understand consumers and how to create one, refer to the Consumers section.
High-level concepts covered in this configuration section include:
Concept | Description |
---|---|
Consumer | A class that consumes one or more messages types, one for each implementation of IConsumer<T> |
Batch Consumer | A class that consumes multiple messages in batches, by implementing IConsumer<Batch<T>> |
Job Consumer | A class that consumes a job message, specified by the IJobConsumer<T> interface |
Consumer Definition | A class, derived from ConsumerDefinition<TConsumer> that configures settings and the consumer's receive endpoint |
Receive Endpoint | Receives messages from a broker queue and delivers those messages to consumer types configured on the receive endpoint |
Consumers can be added many ways allowing either a simple of fine-grained approach to registration. Consumers are added inside the AddMassTransit
configuration, but before the transport.
using MassTransit;
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.Using[Transport]((context, cfg) =>
{
// transport, middleware, other configuration
cfg.ConfigureEndpoints(context);
});
});
Adding Consumers
Adds a single consumer, with all defaults
AddConsumer<MyConsumer>();
AddConsumer(typeof(MyConsumer));
Adds a consumer, with a consumer definition.
AddConsumer<MyConsumer, MyConsumerDefinition>();
AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));
Adds a consumer with a matching consumer definition and configures the consumer pipeline.
AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>
{
cfg.ConcurrentMessageLimit = 8;
});
Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.
void AddConsumers(params Type[] types);
Adds all consumers and consumer definitions in the specified an assembly or assemblies.
void AddConsumers(params Assembly[] assemblies);
Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.
void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);
Batch Options
If you want your consumer to process multiple messages at a time, you can configure a Batch Consumer
. This
is a consumer that implements IConsumer<Batch<TMessage>>
.
AddConsumer<MyBatchConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
Property | Type | Default | Description |
---|---|---|---|
MessageLimit | int | 10 | Max number of messages in a batch |
ConcurrencyLimit | int | 1 | number of concurrent batches |
TimeLimit | TimeSpan | 1 sec | maximum time to wait before delivering a partial batch |
TimeLimitStart | TimeSpan | From First | starting point |
GroupKeyProvider | object? | null | the property to group by |
Job Options
If your consumer needs to work for an extended period of time, greater than a second, you may want to register the consumer as a job consumer. You can read more about this feature in the Job Consumer pattern section.
AddConsumer<MyJobConsumer>(cfg =>
{
cfg.Options<JobOptions<MyJob>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10)
.SetRetry(r => r.Interval(5,30000)));
});
Property | Type | Default | Description |
---|---|---|---|
JobTimeout | TimeSpan | 5 minutes | Maximum time the job is allowed to run |
ConcurrentJobLimit | int | 1 | Number of concurrent executing jobs |
RetryPolicy | IRetryPolicy | None | How should failures be retried, if at all |
JobTypeName | string | Job Type | Override the default job type name used in the JobTypeSaga table (display one) |
Retry Policies
- None: No retries
- Immediate: retry N times, with an optional exception filter
- Intervals: retry N times, with a pause between and an optional exception filter
- Incremental: retry N times, with an increasing pause between and an optional exception filter
- Exponential: retry N times, with an ever increasing pause between and an optional exception filter
Configuring Endpoints
By default MassTransit requires no explicit configuration of endpoints, and can be created
automatically by calling ConfigureEndpoints
. You can customize this behavior using ConsumerDefinition
or by specifying the endpoint configuration inline.
using MassTransit;
services.AddMassTransit(x =>
{
// Step 1: Add Consumers Here
// Step 2: Select a Transport
x.Using[Transport]((context, cfg) => {
// Step 3: Configure the Transport
// Step 4: Configure Endpoints
// All consumers registered in step 1, will get
// default endpoints created.
cfg.ConfigureEndpoints(context);
});
});
Customized Endpoints
To manually configure a consumer on a receive endpoint, use one of the following methods. You may want to do this for the following reasons.
- Group Consumers onto a specific queue, vs the default of one queue per consumer
cfg.ReceiveEndpoint("manually-configured", e =>
{
// configure endpoint-specific settings first
e.SomeEndpointSetting = someValue;
// configure any required middleware components next
e.UseMessageRetry(r => r.Interval(5, 1000));
// configure the consumer last
e.ConfigureConsumer<MyConsumer>(context);
});
// configure any remaining consumers, sagas, etc.
cfg.ConfigureEndpoints(context);
Endpoint Configuration is Custom by Transport
Consumer Configuration
ConfigureConsumer<T>(context);
Configures the consumer on the receive endpoint.
ConfigureConsumer<T>(context, consumer =>
{
// configure consumer-specific middleware
});
Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.
ConfigureConsumers(context);
Configures all consumers that haven't been configured on the receive endpoint.
Consumer Definitions
Inside of a consumer definition you can control all of the definitions about a consumer and its associated endpoint.
public class SubmitOrderConsumerDefinition :
ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
// override the default endpoint name, for whatever reason
EndpointName = "ha-submit-order";
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 4;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DiscoveryPingConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
Endpoint Options
Concept | Type | Description |
---|---|---|
EndpointDefinition | IEndpointDefinition<T> | ?? |
EndpointName | string | the name of the queue that will be generated |
Consumer Options
Concept | Type | Description |
---|---|---|
ConcurrentMessageLimit | int | the number of messages THIS consumer can process concurrently |