Consumers

To understand consumers and how to create one, refer to the Consumers section.

High-level concepts covered in this configuration section include:

ConceptDescription
ConsumerA class that consumes one or more messages types, one for each implementation of IConsumer<T>
Batch ConsumerA class that consumes multiple messages in batches, by implementing IConsumer<Batch<T>>
Job ConsumerA class that consumes a job message, specified by the IJobConsumer<T> interface
Consumer DefinitionA class, derived from ConsumerDefinition<TConsumer> that configures settings and the consumer's receive endpoint
Receive EndpointReceives messages from a broker queue and delivers those messages to consumer types configured on the receive endpoint

Consumers can be added many ways allowing either a simple of fine-grained approach to registration. Consumers are added inside the AddMassTransit configuration, but before the transport.

using MassTransit;

services.AddMassTransit(x =>
{
    x.AddConsumer<MyConsumer>();

    x.Using[Transport]((context, cfg) => 
    {
        // transport, middleware, other configuration
    
        cfg.ConfigureEndpoints(context);
     });
});

Adding Consumers

Adds a single consumer, with all defaults

AddConsumer<MyConsumer>();
AddConsumer(typeof(MyConsumer));

Adds a consumer, with a consumer definition.

AddConsumer<MyConsumer, MyConsumerDefinition>();
AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));

Adds a consumer with a matching consumer definition and configures the consumer pipeline.

AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>
{
    cfg.ConcurrentMessageLimit = 8;
});

Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.

void AddConsumers(params Type[] types);

Adds all consumers and consumer definitions in the specified an assembly or assemblies.

void AddConsumers(params Assembly[] assemblies);

Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.

void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);

Batch Options

If you want your consumer to process multiple messages at a time, you can configure a Batch Consumer. This is a consumer that implements IConsumer<Batch<TMessage>>.

AddConsumer<MyBatchConsumer>(cfg =>
{
    cfg.Options<BatchOptions>(options => options
        .SetMessageLimit(100)
        .SetTimeLimit(s: 1)
        .SetTimeLimitStart(BatchTimeLimitStart.FromLast)
        .GroupBy<MyMessage, string>(x => x.CustomerId)
        .SetConcurrencyLimit(10));
});
PropertyTypeDefaultDescription
MessageLimitint10Max number of messages in a batch
ConcurrencyLimitint1number of concurrent batches
TimeLimitTimeSpan1 secmaximum time to wait before delivering a partial batch
TimeLimitStartTimeSpanFrom Firststarting point
GroupKeyProviderobject?nullthe property to group by

Job Options

If your consumer needs to work for an extended period of time, greater than a second, you may want to register the consumer as a job consumer. You can read more about this feature in the Job Consumer pattern section.

AddConsumer<MyJobConsumer>(cfg =>
{
    cfg.Options<JobOptions<MyJob>>(options => options
        .SetJobTimeout(TimeSpan.FromMinutes(15))
        .SetConcurrentJobLimit(10)
        .SetRetry(r => r.Interval(5,30000)));
});
PropertyTypeDefaultDescription
JobTimeoutTimeSpan5 minutesMaximum time the job is allowed to run
ConcurrentJobLimitint1Number of concurrent executing jobs
RetryPolicyIRetryPolicyNoneHow should failures be retried, if at all
JobTypeNamestringJob TypeOverride the default job type name used in the JobTypeSaga table (display one)

Retry Policies

  • None: No retries
  • Immediate: retry N times, with an optional exception filter
  • Intervals: retry N times, with a pause between and an optional exception filter
  • Incremental: retry N times, with an increasing pause between and an optional exception filter
  • Exponential: retry N times, with an ever increasing pause between and an optional exception filter

Configuring Endpoints

By default MassTransit requires no explicit configuration of endpoints, and can be created automatically by calling ConfigureEndpoints. You can customize this behavior using ConsumerDefinition or by specifying the endpoint configuration inline.

using MassTransit;
services.AddMassTransit(x =>
{
    // Step 1: Add Consumers Here

    // Step 2: Select a Transport
    x.Using[Transport]((context, cfg) => {
        // Step 3: Configure the Transport

        // Step 4: Configure Endpoints
        // All consumers registered in step 1, will get
        // default endpoints created.
        cfg.ConfigureEndpoints(context);
    });
});

Customized Endpoints

To manually configure a consumer on a receive endpoint, use one of the following methods. You may want to do this for the following reasons.

  • Group Consumers onto a specific queue, vs the default of one queue per consumer
Order Matters: Manually configured receive endpoints should be configured before calling ConfigureEndpoints.
cfg.ReceiveEndpoint("manually-configured", e =>
{
    // configure endpoint-specific settings first
    e.SomeEndpointSetting = someValue;
    
    // configure any required middleware components next
    e.UseMessageRetry(r => r.Interval(5, 1000));
    
    // configure the consumer last
    e.ConfigureConsumer<MyConsumer>(context);
});

// configure any remaining consumers, sagas, etc.
cfg.ConfigureEndpoints(context);

Endpoint Configuration is Custom by Transport

Consumer Configuration

ConfigureConsumer<T>(context);

Configures the consumer on the receive endpoint.

ConfigureConsumer<T>(context, consumer => 
{
    // configure consumer-specific middleware
});

Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.

ConfigureConsumers(context);

Configures all consumers that haven't been configured on the receive endpoint.

Consumer Definitions

Inside of a consumer definition you can control all of the definitions about a consumer and its associated endpoint.

public class SubmitOrderConsumerDefinition :
    ConsumerDefinition<SubmitOrderConsumer>
{
    public SubmitOrderConsumerDefinition()
    {
        // override the default endpoint name, for whatever reason
        EndpointName = "ha-submit-order";

        // limit the number of messages consumed concurrently
        // this applies to the consumer only, not the endpoint
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<DiscoveryPingConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000));
        endpointConfigurator.UseInMemoryOutbox();
    }
}

Endpoint Options

ConceptTypeDescription
EndpointDefinitionIEndpointDefinition<T>??
EndpointNamestringthe name of the queue that will be generated

Consumer Options

ConceptTypeDescription
ConcurrentMessageLimitintthe number of messages THIS consumer can process concurrently