Consumers
To understand consumers and how to create one, refer to the Consumers section.
Adding Consumers
Consumers are added inside the AddMassTransit
configuration using any of the following methods.
AddConsumer<MyConsumer>();
Adds a consumer.
AddConsumer<MyConsumer, MyConsumerDefinition>();
Adds a consumer with a matching consumer definition.
AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>{ cfg.ConcurrentMessageLimit = 8;});
Adds a consumer with a matching consumer definition and configures the consumer pipeline.
AddConsumer(typeof(MyConsumer));
Adds a consumer by type.
AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));
Adds a consumer with a matching consumer definition by type.
void AddConsumers(params Type[] types);
Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.
void AddConsumers(params Assembly[] assemblies);
Adds all consumers and consumer definitions in the specified an assembly or assemblies.
void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);
Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.
Batch Options
AddConsumer<MyBatchConsumer>(cfg =>{ cfg.Options<BatchOptions>(options => options .SetMessageLimit(100) .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .GroupBy<MyMessage>(x => x.CustomerId) .SetConcurrencyLimit(10));});
Adds a batch consumer and configures the batch options.
Job Options
AddConsumer<MyJobConsumer>(cfg =>{ cfg.Options<JobOptions<MyJob>>(options => options .SetMessageLimit(100) .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .GroupBy<MyMessage>(x => x.CustomerId) .SetConcurrencyLimit(10));});
Adds a job consumer and configures the job options.
Configuring Consumers
Consumers are automatically configured when ConfigureEndpoints
is called, which is highly recommended. The endpoint configuration can be mostly customized using either a consumer definition or by specifying the endpoint configuration inline.
To manually configure a consumer on a receive endpoint, use one of the following methods.
Order Matters
Manually configured receive endpoints should be configured before calling ConfigureEndpoints.
cfg.ReceiveEndpoint("manually-configured", e =>{ // configure endpoint-specific settings first e.SomeEndpointSetting = someValue; // configure any required middleware components next e.UseMessageRetry(r => r.Interval(5, 1000)); // configure the consumer last e.ConfigureConsumer<MyConsumer>(context);});// configure any remaining consumers, sagas, etc.cfg.ConfigureEndpoints(context);
Configuration Methods
ConfigureConsumer<T>(context);
Configures the consumer on the receive endpoint.
ConfigureConsumer<T>(context, consumer => { // configure consumer-specific middleware});
Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.
ConfigureConsumers(context);
Configures all consumers that haven't been configured on the receive endpoint.