Consumers

To understand consumers and how to create one, refer to the Consumers section.

Adding Consumers

Consumers are added inside the AddMassTransit configuration using any of the following methods.

AddConsumer<MyConsumer>();

Adds a consumer.

AddConsumer<MyConsumer, MyConsumerDefinition>();

Adds a consumer with a matching consumer definition.

AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>{    cfg.ConcurrentMessageLimit = 8;});

Adds a consumer with a matching consumer definition and configures the consumer pipeline.

AddConsumer(typeof(MyConsumer));

Adds a consumer by type.

AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));

Adds a consumer with a matching consumer definition by type.

void AddConsumers(params Type[] types);

Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.

void AddConsumers(params Assembly[] assemblies);

Adds all consumers and consumer definitions in the specified an assembly or assemblies.

void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);

Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.

Batch Options

AddConsumer<MyBatchConsumer>(cfg =>{    cfg.Options<BatchOptions>(options => options        .SetMessageLimit(100)        .SetTimeLimit(s: 1)        .SetTimeLimitStart(BatchTimeLimitStart.FromLast)        .GroupBy<MyMessage>(x => x.CustomerId)        .SetConcurrencyLimit(10));});

Adds a batch consumer and configures the batch options.

Job Options

AddConsumer<MyJobConsumer>(cfg =>{    cfg.Options<JobOptions<MyJob>>(options => options        .SetMessageLimit(100)        .SetTimeLimit(s: 1)        .SetTimeLimitStart(BatchTimeLimitStart.FromLast)        .GroupBy<MyMessage>(x => x.CustomerId)        .SetConcurrencyLimit(10));});

Adds a job consumer and configures the job options.

Configuring Consumers

Consumers are automatically configured when ConfigureEndpoints is called, which is highly recommended. The endpoint configuration can be mostly customized using either a consumer definition or by specifying the endpoint configuration inline.

To manually configure a consumer on a receive endpoint, use one of the following methods.

Order Matters

Manually configured receive endpoints should be configured before calling ConfigureEndpoints.

Manually configured receive endpoints should be configured before calling ConfigureEndpoints.
cfg.ReceiveEndpoint("manually-configured", e =>{    // configure endpoint-specific settings first    e.SomeEndpointSetting = someValue;        // configure any required middleware components next    e.UseMessageRetry(r => r.Interval(5, 1000));        // configure the consumer last    e.ConfigureConsumer<MyConsumer>(context);});// configure any remaining consumers, sagas, etc.cfg.ConfigureEndpoints(context);

Configuration Methods

ConfigureConsumer<T>(context);

Configures the consumer on the receive endpoint.

ConfigureConsumer<T>(context, consumer => {    // configure consumer-specific middleware});

Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.

ConfigureConsumers(context);

Configures all consumers that haven't been configured on the receive endpoint.