Job Consumers

In MassTransit, when a message is delivered from the broker to a consumer, it gets locked by the broker until the consumer completes processing. This lock ensures that the message won’t be delivered to other consumers, even on different bus instances reading from the same queue (the competing consumer pattern). Once the consumer completes, the message is acknowledged and removed from the queue. However, if the connection to the broker is lost, the message may be unlocked and redelivered to another consumer. This works well for most cases where the processing time is short.

But what happens when you need to process a message that takes a long time? In scenarios where a task might take several minutes, hours, or even longer, a standard consumer may not be the best fit. This is where Job Consumers come in.

What are Job Consumers?

A Job Consumer in MassTransit is a specialized type of consumer designed to handle long-running tasks, often referred to as jobs. Unlike traditional consumers that lock and process messages quickly, job consumers are built to execute tasks that may take an extended time to complete. They provide additional functionality to manage long-running tasks, including handling retries, concurrency, and ensuring job completion even in the face of system interruptions.

Job consumers are implemented using the IJobConsumer<TJob> interface, where TJob represents the message type that defines the job. This makes them ideal for scenarios where jobs could take minutes or even hours, such as video processing, large data transformations, or background tasks that don’t fit within the typical message lock period provided by brokers like RabbitMQ, Azure Service Bus, or Amazon SQS.

One critical difference is that job consumers require a saga repository to store and manage the job state. They decouple the task of consuming the message from the message broker, allowing more flexibility for handling tasks asynchronously and managing retries without relying solely on broker mechanisms.

Why Should You Use a Job Consumer?

The key question to ask is: How long does the task take to complete?

  • If your task takes less than 5 minutes, a standard consumer is usually sufficient. Brokers like RabbitMQ, Azure, or SQS can hold a message lock for around 5 minutes, which is often long enough for most tasks.
  • If your task exceeds 5 minutes, that’s when you should consider using a job consumer. When tasks exceed the broker’s lock time, you risk message reprocessing or failures due to lock timeouts. Job consumers are specifically designed to handle these scenarios without worrying about broker timeouts.

That said, it’s important to recognize that job consumers introduce some computational overhead due to the additional bookkeeping required to manage job state, retries, and concurrency. Make sure the benefits outweigh the extra complexity before adopting job consumers for your long-running tasks.

MassTransit/Sample-JobConsumers

MassTransit includes a job service that keeps track of each job, assigns jobs to service instances, and schedules job retries when necessary. The job service uses three saga state machines and the default configuration uses an in-memory saga repository, which is not durable. When using job consumers for production use cases, configuring durable saga repositories is highly recommended to avoid possible message loss. Check out the sample project on GitHub, which includes the Entity Framework configuration for the job service state machines.

Implementation

To use job consumers, you'll need to create a consumer that implements the IJobConsumer<TJob> interface.

public interface IJobConsumer<in TJob> :
    IConsumer
    where TJob : class
{
    Task Run(JobContext<TJob> context);
}
public class ConvertVideoJobConsumer : 
    IJobConsumer<ConvertVideo>
{
    public async Task Run(JobContext<ConvertVideo> context)
    {
        await Task.Delay(30000, context.CancellationToken);        
    }
}

Job Context

In a job consumer, JobContext<TJob> is the job consumer version of ConsumeContext<T>. Since job consumers do not run while a message is in flight or locked, a separate context is used. In addition to the standard message context properties, the job context also includes the following properties.

PropertyTypeDescription
JobIdGuidThe job's identifier assigned when the job was submitted
AttemptIdGuidUniquely identifies this job attempt
RetryAttemptintIf greater than zero, the retry attempt of the job
LastProgressValuelong?If a previous job attempt updated the progress, the last updated value stored for the job
LastProgressLimitlong?If a previous job attempt updated the progress, the last updated limit stored for the job
ElapsedTimeTimeSpanHow long the current job attempt has been running
JobPropertiesDictionaryThe properties added when the job was submitted
JobTypePropertiesDictionaryThe properties configured by the JobOptions<T>
InstancePropertiesDictionaryThe properties configured by the JobOptions<T> on a specific job consumer instance

Job Cancellation

When a job is canceled, the CancellationToken on JobContext is canceled. Job consumers should check for cancellation using IsCancellationRequested and when it is safe to cancel call:

context.CancellationToken.ThrowIfCancellationRequested()

This ensured the job is properly reported as canceled to the job saga state machines.

When the bus is stopped and there are job consumers configured on the bus, any running jobs are canceled. Canceled jobs will be restarted by the next available job consumer bus instance (added in MassTransit v8.3.0).

Job Progress

New in MassTransit v8.3.0

Job consumers can track progress and that progress is saved by the job saga. If a job is canceled or faults, the most recently saved progress is included in the JobContext passed to the job consumer if the job is retried.

To save progress, call SetJobProgress as shown below.

public class ConvertVideoJobConsumer : 
    IJobConsumer<ConvertVideo>
{
    public async Task Run(JobContext<ConvertVideo> context)
    {
        // some aspects of the content being process
        long length = File.Length;
        
        await context.SetJobProgress(0, length);
        
        for (int index = 1; index <= length; index++)
        {
            // do something
        
            context.SetJobProgress(index, length);
        }
    }
}

Job State

New in MassTransit v8.3.0

Job consumers can save state in the job saga. In the event that a job is canceled or faults, when the job is retried the previously saved state will be included in the JobContext passed to the job consumer.

To save the job state when a job is canceled:

public class ConvertVideoJobConsumer : 
    IJobConsumer<ConvertVideo>
{
    public async Task Run(JobContext<ConvertVideo> context)
    {
        // some aspects of the content being process
        long length = File.Length;
        
        int index = 1;
        try
        {
            await context.SetJobProgress(0, length);
            
            for (; index <= length; index++)
            {
                context.CancellationToken.ThrowIfCancellationRequested();
                
                // do something
            
                context.SetJobProgress(index, length);
            }
        }
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
        {
            await context.SaveJobState(new ConsumerState { LastIndex = index });
            throw;            
        }
    }
    
    class ConsumerState
    {
        public long LastIndex { get; set; }
    }
}

When the job is started, the consumer can check if a previously saved job state exists, and use it to continue where processing left off.

public class ConvertVideoJobConsumer : 
    IJobConsumer<ConvertVideo>
{
    public async Task Run(JobContext<ConvertVideo> context)
    {
        // some aspects of the content being process
        long length = File.Length;
        
        int index = context.TryGetJobState(out ConsumerState? state)
            ? state.LastIndex + 1
            : 1;

        // elided, see above
    }
}

The job state type (in this case, ConsumerState) is only relevant to the job consumer and is stored as a serialized dictionary in the job saga.

Configuration

The example below configures a job consumer that is automatically configured by ConfigureEndpoints.

services.AddMassTransit(x =>
{
    x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
    {
        cfg.Options<JobOptions<ConvertVideo>>(options => options
            .SetJobTimeout(TimeSpan.FromMinutes(15))
            .SetConcurrentJobLimit(10));
    });

    x.AddDelayedMessageScheduler();
    
    x.SetKebabCaseEndpointNameFormatter();

    // in this case, just use the in-memory saga repository, 
    // but an actual database should be used
    x.SetInMemorySagaRepositoryProvider();
    
    x.AddJobSagaStateMachines();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();
        
        cfg.ConfigureEndpoints(context);
    });
});
The old syntax of creating a service instance and manually configuring job consumers is completely deprecated and will no longer be supported.

In this example, the job timeout as well as the number of concurrent jobs allowed is configured via JobOptions<T> when configuring the consumer. The job options can also be specified using a consumer definition in the same way.

Job Options

There are several job options that can be configured, including:

OptionTypeDescription
ConcurrentJobLimitintThe number of concurrent jobs allowed to run on a given instance
JobTimeoutTimeSpanHow long a job consumer is allowed to run before the job is canceled (via the CancellationToken)
JobCancellationTimeoutTimeSpanHow long after a job consumer is canceled to wait before considering the job canceled regardless of whether it has completed
JobTypeNamestringOverride the default job type name (optional, not really recommended)
RetryPolicyIRetryPolicyThe retry policy applied when a job faults
ProgressBuffer.TimeLimitTimeSpanHow often any progress updates should be reported to the job saga
ProgressBuffer.UpdateLimitTimeSpanHow many progress updates should be reported before updating the job saga
JobTypePropertiesDictionaryProperties associated with the job type (should be the same on every job consumer bus instance)
InstancePropertiesDictionaryProperties associated with the currently running job consumer bus instance

Job Saga Options

When adding the job saga state machines, the JobSagaOptions can also be configured.

x.AddJobSagaStateMachines(options =>
{
    options.FinalizeCompleted = true;
    options.ConcurrentMessageLimit = 32;
    options.HeartbeatTimeout = TimeSpan.FromMinutes(5);
    options.SlotWaitTime = TimeSpan.FromSeconds(30);
    options.SuspectJobRetryCount = 2;
    options.SuspectJobRetryDelay = TimeSpan.FromMinutes(1);
});
OptionTypeDescription
FinalizeCompletedboolAutomatically remove completed job sagas
ConcurrentMessageLimitintSpecifies the concurrency of the job service sagas (not the actual jobs)
HeartbeatIntervalTimeSpanPeriod of time after which a job consumer bus instance is removed from the active list if no heartbeat has been received
SlotWaitTimeTimeSpanHow long a job waits for an available job slot between attempts
SuspectJobRetryCountintHow many times to retry a job that becomes suspect (not faulted, but the job consumer bus instance stops responding)
SuspectJobRetryDelayTimeSpanHow long a job should wait retrying when the job became suspect and SuspectJobRetryCount is > 0

Job Commands

Submit Job

To submit a job, call the SubmitJob extension method on an IPublishEndpoint as shown below. This is a fire-and-forget method, no response is sent.

[HttpPut("{path}")]
public async Task<IActionResult> FireAndForgetSubmitJob(string path, [FromServices] IPublishEndpoint publishEndpoint)
{
    var jobId = await publishEndpoint.SubmitJob<ConvertVideo>(new ConvertVideo
    {
        Path = path
    });

    return Ok(new
    {
        jobId,
        path
    });
}

To wait for a response indicating the job submission was successful (not really necessary, but commonly used), use the request client, IRequestClient<T>, submit a job using the SubmitJob extension method as shown below. The RequestId generated by the request client will be used as the JobId.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<ConvertVideo> client)
{    
    var jobId = await client.SubmitJob(new ConvertVideo
    {
        Path = path
    });

    return Ok(new
    {
        jobId,
        path
    });
}

Additionally, a jobId can be specified if the IRequestClient<SubmitJob<TJob>> interface is used instead.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
    var jobId = NewId.NextGuid();
        
    await client.SubmitJob(jobId, new ConvertVideo
    {
        Path = path
    });

    return Ok(new
    {
        jobId,
        path
    });
}

To submit a job including job properties (such as a tenantId or other property value typically reflecting some cross-cutting concern or environmental setting such a data center location, country, etc.), use the overload as shown.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
    var jobId = NewId.NextGuid();
        
    await client.SubmitJob(jobId, new ConvertVideo
    {
        Path = path
    }, x => x.Set("TenantId", _tenantId));

    return Ok(new
    {
        jobId,
        path
    });
}

Cancel Job

To cancel a submitted job, call the CancelJob extension method on an IPublishEndpoint as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> CancelJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
    var jobId = await publishEndpoint.CancelJob(jobId);

    return Ok();
}

Retry Job

To retry a faulted or canceled job, call the RetryJob extension method on an IPublishEndpoint as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> RetryJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
    var jobId = await publishEndpoint.RetryJob(jobId);

    return Ok();
}

Finalize Job

When a job is canceled or faults, the job is not removed from the saga repository. To remove a job in the Canceled or Faulted state, use the FinalizeJob method as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> FinalizeJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
    var jobId = await publishEndpoint.FinalizeJob(jobId);

    return Ok();
}

Schedule Job

New in MassTransit v8.3.0

By default, submitted jobs will run as soon as possible. Jobs can also be scheduled by specifying a start time, using the ScheduleJob method.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
    await client.ScheduleJob(DateTimeOffset.Now.AddMinutes(15), new ConvertVideo
    {
        Path = path
    });

    return Ok(new
    {
        jobId,
        path
    });
}

Recurring Jobs

New in MassTransit v8.3.0

MassTransit supports recurring jobs, which are useful when a consumer needs to run on a predefined schedule. Recurring jobs use regular job consumers and are scheduled using the transport's built-in message scheduling and the job saga state machines.

Recurring jobs are entirely built into MassTransit, and require no additional application infrastructure. This means Quartz.NET or Hangfire are NOT required.

Recurring jobs are configuring using a cron expression. Cron expressions are a well known way to define a schedule and can be built using any of the online cron expression builders. Cron expressions can be very expressive. For instance, 0 0,15,30,45 * * * 1,3,5 would mean at 0, 15, 30, and 45 minutes past the hour, only on Monday, Wednesday, and Friday.

Schedule Recurring Job

To schedule a recurring job, use the AddOrUpdateRecurringJob method as shown below.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
    await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
        new RoutineMaintenanceCommand(), "0 0,15,30,45 * * * 1,3,5");
}

public record RoutingMaintenanceCommand;

A simple expression builder is also available, which can be used to generate a cron expression.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
    await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
        new RoutineMaintenanceCommand(), x => x.Every(minutes: 15));
}

Recurring jobs can also be confined to a period of time, using start and end dates. Specifying a start date will apply the cron expression to run at the first opportunity after the specified start date. Specifying an end date will ensure the job is not run after that date.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
    await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
        new RoutineMaintenanceCommand(), x =>
        {
            x.Start = new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero);
            x.End = new DateTimeOffset(2025, 1, 1, 0, 0, 0, TimeSpan.Zero);
            x.Every(minutes: 30);
        });
}

Calling AddOrUpdateRecurringJob can be used to update the job message or the schedule. If the schedule is changed, the next run will be rescheduled using the newly specified cron expression.

Run Recurring Job

To force a recurring job to run immediately, use the RunRecurringJob method.

public async Task RunJobNow(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.RunRecurringJob<RoutineMaintenanceCommand>("RoutineMaintenance");
}

On the job has completed, the job next job run will be scheduled using the previously supplied cron expression.

Job Saga Endpoints

The job saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at least one bus instance. Additionally, it is not necessary to configure them on every bus instance. In the example above, the job saga state machines endpoints are configured. A standard ConfigureEndpoints call will suffice to host the job consumers without the job saga state machines.

Job Saga Repository

A persistent saga repository should be used with job consumers, and should be configured when adding the job saga state machines. In the example below, Entity Framework Core is configured, along with the Postgres lock statement provider (required when using PostgreSQL).

x.AddJobSagaStateMachines()
    .EntityFrameworkRepository(r =>
    {
        r.ExistingDbContext<JobServiceSagaDbContext>();
        r.UsePostgres();
    });

For a more detailed example of configuring the job saga state machine endpoints, including persistent storage, see the sample mentioned in the box above.

Job Saga State Machines

Job consumers use three saga state machines to orchestrate jobs and keep track of available job consumer bus instances.

VariableDescription
JobSagaOrchestrates each job, including scheduling, retry, and failure handling
JobAttemptSagaOrchestrates each job attempt, communicating directly with the job consumer bus instances
JobTypeSagaKeep track of available job instances and allocates job slots to waiting jobs

Job Saga States

A job can be in one of the following states:

  1. Initial
  2. Final
  3. Submitted
  4. Waiting to Start
  5. Waiting for Slot
  6. Started
  7. Completed
  8. Faulted
  9. Canceled
  10. Starting Job Attempt
  11. Allocating Job Slot
  12. Waiting to Retry
  13. Cancellation Pending

Job Attempt Saga States

A job attempt can be in one of the following states:

  1. Initial
  2. Final
  3. Starting
  4. Running
  5. Faulted
  6. Checking Status
  7. Suspect

Job Type Saga States

A job type has only two valid states

  1. Initial
  2. Final
  3. Active
  4. Idle

Job Distribution Strategy

New in MassTransit v8.3.0

To support more complex job consumer scenarios, MassTransit enables the use of a custom job distribution strategy. This strategy is employed by the job type saga to decide which job consumer bus instance should handle a particular job. By configuring JobProperties and InstanceProperties within JobOptions<T>, you can control how jobs are assigned to specific consumer instances. For example, you might allocate jobs from premium customers to consumer instances running on premium hardware, ensuring that resource-intensive jobs are handled by more capable instances.

To use a custom strategy, create a class that implements IJobDistributionStrategy.

public class MachineTypeJobDistributionStrategy :
    IJobDistributionStrategy
{
    public Task<ActiveJob?> IsJobSlotAvailable(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
    {
        object? strategy = null;
        jobTypeInfo.Properties?.TryGetValue("DistributionStrategy", out strategy);

        return strategy switch
        {
            "MachineType" => MachineType(context, jobTypeInfo),
            _ => DefaultJobDistributionStrategy.Instance.IsJobSlotAvailable(context, jobTypeInfo)
        };
    }

    Task<ActiveJob?> MachineType(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
    {
        var customerType = context.GetHeader("CustomerType");
        
        var machineType = customerType switch 
        {
            "Premium" => "S-Class",
            _ => "E-Class"
        };

        var instances = from i in jobTypeInfo.Instances
            join a in jobTypeInfo.ActiveJobs on i.Key equals a.InstanceAddress into ai
            where (ai.Count() < jobTypeInfo.ConcurrentJobLimit
                    && string.IsNullOrEmpty(dataCenter))
                || (i.Value.Properties.TryGetValue("MachineType", out var mt) && mt is string mtext && mtext == machineType)
            orderby ai.Count(), i.Value.Used
            select new
            {
                Instance = i.Value,
                InstanceAddress = i.Key,
                InstanceCount = ai.Count()
            };

        var firstInstance = instances.FirstOrDefault();
        if (firstInstance == null)
            return Task.FromResult<ActiveJob?>(null);

        return Task.FromResult<ActiveJob?>(new ActiveJob
        {
            JobId = context.Message.JobId,
            InstanceAddress = firstInstance.InstanceAddress
        });
    }
}

Then register the strategy using the TryAddJobDistributionStrategy method:

services.TryAddJobDistributionStrategy<MachineTypeJobDistributionStrategy>();

The strategy must be registered where the job saga state machines are registered and is not required on the job consumer bus instances.

The job distribution strategy is resolved from the container as a scoped service and any class dependencies will be resolved using the current scope of the job type saga state machine. This allows dependencies to be injected, including the current DbContext if using Entity Framework Core.

Job Strategy Options

To support the use of job distribution strategies, new properties were added to JobOptions<TJob>. Following the example above, the MachineType property should be added at startup.

x.AddConsumer<ConvertVideoConsumer>(c =>
    c.Options<JobOptions<ConvertVideo>>(options => options
        .SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30)))
        .SetJobTimeout(TimeSpan.FromMinutes(10))
        .SetConcurrentJobLimit(10)
        .SetJobTypeProperties(p => p.Set("DistributionStrategy", "MachineType"))
        .SetInstanceProperties(p => p.Set("MachineType", "S-Class")));

The properties should be set using environmental information, such as machine type, data center location, or whatever makes sense for the desired strategy. JobProperties apply to the job type and InstanceProperties apply to the job consumer bus instance (the bus instance containing the job consumer).