Job Consumers
In MassTransit, when a message is delivered from the broker to a consumer, it gets locked by the broker until the consumer completes processing. This lock ensures that the message won’t be delivered to other consumers, even on different bus instances reading from the same queue (the competing consumer pattern). Once the consumer completes, the message is acknowledged and removed from the queue. However, if the connection to the broker is lost, the message may be unlocked and redelivered to another consumer. This works well for most cases where the processing time is short.
But what happens when you need to process a message that takes a long time? In scenarios where a task might take several minutes, hours, or even longer, a standard consumer may not be the best fit. This is where Job Consumers come in.
What are Job Consumers?
A Job Consumer in MassTransit is a specialized type of consumer designed to handle long-running tasks, often referred to as jobs. Unlike traditional consumers that lock and process messages quickly, job consumers are built to execute tasks that may take an extended time to complete. They provide additional functionality to manage long-running tasks, including handling retries, concurrency, and ensuring job completion even in the face of system interruptions.
Job consumers are implemented using the IJobConsumer<TJob>
interface, where TJob
represents the message type that defines the job. This makes them ideal for
scenarios where jobs could take minutes or even hours, such as video processing, large data transformations, or background tasks that don’t fit within the
typical message lock period provided by brokers like RabbitMQ, Azure Service Bus, or Amazon SQS.
One critical difference is that job consumers require a saga repository to store and manage the job state. They decouple the task of consuming the message from the message broker, allowing more flexibility for handling tasks asynchronously and managing retries without relying solely on broker mechanisms.
Why Should You Use a Job Consumer?
The key question to ask is: How long does the task take to complete?
- If your task takes less than 5 minutes, a standard consumer is usually sufficient. Brokers like RabbitMQ, Azure, or SQS can hold a message lock for around 5 minutes, which is often long enough for most tasks.
- If your task exceeds 5 minutes, that’s when you should consider using a job consumer. When tasks exceed the broker’s lock time, you risk message reprocessing or failures due to lock timeouts. Job consumers are specifically designed to handle these scenarios without worrying about broker timeouts.
That said, it’s important to recognize that job consumers introduce some computational overhead due to the additional bookkeeping required to manage job state, retries, and concurrency. Make sure the benefits outweigh the extra complexity before adopting job consumers for your long-running tasks.
MassTransit includes a job service that keeps track of each job, assigns jobs to service instances, and schedules job retries when necessary. The job service uses three saga state machines and the default configuration uses an in-memory saga repository, which is not durable. When using job consumers for production use cases, configuring durable saga repositories is highly recommended to avoid possible message loss. Check out the sample project on GitHub, which includes the Entity Framework configuration for the job service state machines.
Implementation
To use job consumers, you'll need to create a consumer that implements the IJobConsumer<TJob>
interface.
public interface IJobConsumer<in TJob> :
IConsumer
where TJob : class
{
Task Run(JobContext<TJob> context);
}
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
await Task.Delay(30000, context.CancellationToken);
}
}
Job Context
In a job consumer, JobContext<TJob>
is the job consumer version of ConsumeContext<T>
. Since job consumers do not run while a message is in flight or locked,
a separate context is used. In addition to the standard message context properties, the job context also includes the following properties.
Property | Type | Description |
---|---|---|
JobId | Guid | The job's identifier assigned when the job was submitted |
AttemptId | Guid | Uniquely identifies this job attempt |
RetryAttempt | int | If greater than zero, the retry attempt of the job |
LastProgressValue | long? | If a previous job attempt updated the progress, the last updated value stored for the job |
LastProgressLimit | long? | If a previous job attempt updated the progress, the last updated limit stored for the job |
ElapsedTime | TimeSpan | How long the current job attempt has been running |
JobProperties | Dictionary | The properties added when the job was submitted |
JobTypeProperties | Dictionary | The properties configured by the JobOptions<T> |
InstanceProperties | Dictionary | The properties configured by the JobOptions<T> on a specific job consumer instance |
Job Cancellation
When a job is canceled, the CancellationToken
on JobContext
is canceled. Job consumers should check for cancellation using
IsCancellationRequested and when it is safe to cancel call:
context.CancellationToken.ThrowIfCancellationRequested()
This ensured the job is properly reported as canceled to the job saga state machines.
Job Progress
New in MassTransit v8.3.0
Job consumers can track progress and that progress is saved by the job saga. If a job is canceled or faults, the most recently saved progress is included
in the JobContext
passed to the job consumer if the job is retried.
To save progress, call SetJobProgress
as shown below.
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
await context.SetJobProgress(0, length);
for (int index = 1; index <= length; index++)
{
// do something
context.SetJobProgress(index, length);
}
}
}
Job State
New in MassTransit v8.3.0
Job consumers can save state in the job saga. In the event that a job is canceled or faults, when the job is retried the previously saved state will be
included in the JobContext
passed to the job consumer.
To save the job state when a job is canceled:
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
int index = 1;
try
{
await context.SetJobProgress(0, length);
for (; index <= length; index++)
{
context.CancellationToken.ThrowIfCancellationRequested();
// do something
context.SetJobProgress(index, length);
}
}
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
{
await context.SaveJobState(new ConsumerState { LastIndex = index });
throw;
}
}
class ConsumerState
{
public long LastIndex { get; set; }
}
}
When the job is started, the consumer can check if a previously saved job state exists, and use it to continue where processing left off.
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
int index = context.TryGetJobState(out ConsumerState? state)
? state.LastIndex + 1
: 1;
// elided, see above
}
}
The job state type (in this case, ConsumerState
) is only relevant to the job consumer and is stored as a serialized dictionary in the job saga.
Configuration
The example below configures a job consumer that is automatically configured by ConfigureEndpoints
.
services.AddMassTransit(x =>
{
x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
{
cfg.Options<JobOptions<ConvertVideo>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
// in this case, just use the in-memory saga repository,
// but an actual database should be used
x.SetInMemorySagaRepositoryProvider();
x.AddJobSagaStateMachines();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
});
});
In this example, the job timeout as well as the number of concurrent jobs allowed is configured via JobOptions<T>
when configuring the consumer. The job
options can also be specified using a consumer definition in the same way.
Job Options
There are several job options that can be configured, including:
Option | Type | Description |
---|---|---|
ConcurrentJobLimit | int | The number of concurrent jobs allowed to run on a given instance |
JobTimeout | TimeSpan | How long a job consumer is allowed to run before the job is canceled (via the CancellationToken) |
JobCancellationTimeout | TimeSpan | How long after a job consumer is canceled to wait before considering the job canceled regardless of whether it has completed |
JobTypeName | string | Override the default job type name (optional, not really recommended) |
RetryPolicy | IRetryPolicy | The retry policy applied when a job faults |
ProgressBuffer.TimeLimit | TimeSpan | How often any progress updates should be reported to the job saga |
ProgressBuffer.UpdateLimit | TimeSpan | How many progress updates should be reported before updating the job saga |
JobTypeProperties | Dictionary | Properties associated with the job type (should be the same on every job consumer bus instance) |
InstanceProperties | Dictionary | Properties associated with the currently running job consumer bus instance |
Job Saga Options
When adding the job saga state machines, the JobSagaOptions
can also be configured.
x.AddJobSagaStateMachines(options =>
{
options.FinalizeCompleted = true;
options.ConcurrentMessageLimit = 32;
options.HeartbeatTimeout = TimeSpan.FromMinutes(5);
options.SlotWaitTime = TimeSpan.FromSeconds(30);
options.SuspectJobRetryCount = 2;
options.SuspectJobRetryDelay = TimeSpan.FromMinutes(1);
});
Option | Type | Description |
---|---|---|
FinalizeCompleted | bool | Automatically remove completed job sagas |
ConcurrentMessageLimit | int | Specifies the concurrency of the job service sagas (not the actual jobs) |
HeartbeatInterval | TimeSpan | Period of time after which a job consumer bus instance is removed from the active list if no heartbeat has been received |
SlotWaitTime | TimeSpan | How long a job waits for an available job slot between attempts |
SuspectJobRetryCount | int | How many times to retry a job that becomes suspect (not faulted, but the job consumer bus instance stops responding) |
SuspectJobRetryDelay | TimeSpan | How long a job should wait retrying when the job became suspect and SuspectJobRetryCount is > 0 |
Job Commands
Submit Job
To submit a job, call the SubmitJob
extension method on an IPublishEndpoint
as shown below. This is a fire-and-forget method, no response is sent.
[HttpPut("{path}")]
public async Task<IActionResult> FireAndForgetSubmitJob(string path, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.SubmitJob<ConvertVideo>(new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}
To wait for a response indicating the job submission was successful (not really necessary, but commonly used), use the request client, IRequestClient<T>
,
submit a job using the SubmitJob
extension method as shown below. The RequestId generated by the request client will be used as the JobId.
[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<ConvertVideo> client)
{
var jobId = await client.SubmitJob(new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}
Additionally, a jobId
can be specified if the IRequestClient<SubmitJob<TJob>>
interface is used instead.
[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}
To submit a job including job properties (such as a tenantId or other property value typically reflecting some cross-cutting concern or environmental setting such a data center location, country, etc.), use the overload as shown.
[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo
{
Path = path
}, x => x.Set("TenantId", _tenantId));
return Ok(new
{
jobId,
path
});
}
Cancel Job
To cancel a submitted job, call the CancelJob
extension method on an IPublishEndpoint
as shown.
[HttpPut("{jobId}")]
public async Task<IActionResult> CancelJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.CancelJob(jobId);
return Ok();
}
Retry Job
To retry a faulted or canceled job, call the RetryJob
extension method on an IPublishEndpoint
as shown.
[HttpPut("{jobId}")]
public async Task<IActionResult> RetryJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.RetryJob(jobId);
return Ok();
}
Finalize Job
When a job is canceled or faults, the job is not removed from the saga repository. To remove a job in the Canceled or Faulted state, use the FinalizeJob
method as shown.
[HttpPut("{jobId}")]
public async Task<IActionResult> FinalizeJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.FinalizeJob(jobId);
return Ok();
}
Schedule Job
New in MassTransit v8.3.0
By default, submitted jobs will run as soon as possible. Jobs can also be scheduled by specifying a start time, using the ScheduleJob
method.
[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
await client.ScheduleJob(DateTimeOffset.Now.AddMinutes(15), new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}
Recurring Jobs
New in MassTransit v8.3.0
MassTransit supports recurring jobs, which are useful when a consumer needs to run on a predefined schedule. Recurring jobs use regular job consumers and are scheduled using the transport's built-in message scheduling and the job saga state machines.
Recurring jobs are configuring using a cron expression. Cron expressions are a well known way to define a schedule and can be built using any of the
online cron expression builders. Cron expressions can be very expressive. For instance, 0 0,15,30,45 * * * 1,3,5
would
mean at 0, 15, 30, and 45 minutes past the hour, only on Monday, Wednesday, and Friday.
Schedule Recurring Job
To schedule a recurring job, use the AddOrUpdateRecurringJob
method as shown below.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), "0 0,15,30,45 * * * 1,3,5");
}
public record RoutingMaintenanceCommand;
A simple expression builder is also available, which can be used to generate a cron expression.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), x => x.Every(minutes: 15));
}
Recurring jobs can also be confined to a period of time, using start and end dates. Specifying a start date will apply the cron expression to run at the first opportunity after the specified start date. Specifying an end date will ensure the job is not run after that date.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), x =>
{
x.Start = new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero);
x.End = new DateTimeOffset(2025, 1, 1, 0, 0, 0, TimeSpan.Zero);
x.Every(minutes: 30);
});
}
Calling AddOrUpdateRecurringJob
can be used to update the job message or the schedule. If the schedule is changed, the next run will be rescheduled using the
newly specified cron expression.
Run Recurring Job
To force a recurring job to run immediately, use the RunRecurringJob
method.
public async Task RunJobNow(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.RunRecurringJob<RoutineMaintenanceCommand>("RoutineMaintenance");
}
On the job has completed, the job next job run will be scheduled using the previously supplied cron expression.
Job Saga Endpoints
The job saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at least one bus instance. Additionally, it is not necessary to configure them on every bus instance. In the example above, the job saga state machines endpoints are configured. A standard ConfigureEndpoints call will suffice to host the job consumers without the job saga state machines.
Job Saga Repository
A persistent saga repository should be used with job consumers, and should be configured when adding the job saga state machines. In the example below, Entity Framework Core is configured, along with the Postgres lock statement provider (required when using PostgreSQL).
x.AddJobSagaStateMachines()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<JobServiceSagaDbContext>();
r.UsePostgres();
});
For a more detailed example of configuring the job saga state machine endpoints, including persistent storage, see the sample mentioned in the box above.
Job Saga State Machines
Job consumers use three saga state machines to orchestrate jobs and keep track of available job consumer bus instances.
Variable | Description |
---|---|
JobSaga | Orchestrates each job, including scheduling, retry, and failure handling |
JobAttemptSaga | Orchestrates each job attempt, communicating directly with the job consumer bus instances |
JobTypeSaga | Keep track of available job instances and allocates job slots to waiting jobs |
Job Saga States
A job can be in one of the following states:
- Initial
- Final
- Submitted
- Waiting to Start
- Waiting for Slot
- Started
- Completed
- Faulted
- Canceled
- Starting Job Attempt
- Allocating Job Slot
- Waiting to Retry
- Cancellation Pending
Job Attempt Saga States
A job attempt can be in one of the following states:
- Initial
- Final
- Starting
- Running
- Faulted
- Checking Status
- Suspect
Job Type Saga States
A job type has only two valid states
- Initial
- Final
- Active
- Idle
Job Distribution Strategy
New in MassTransit v8.3.0
To support more complex job consumer scenarios, MassTransit enables the use of a custom job distribution strategy. This strategy is employed by the job type
saga to decide which job consumer bus instance should handle a particular job. By configuring JobProperties
and InstanceProperties
within JobOptions<T>
, you
can control how jobs are assigned to specific consumer instances. For example, you might allocate jobs from premium customers to consumer instances running on
premium hardware, ensuring that resource-intensive jobs are handled by more capable instances.
To use a custom strategy, create a class that implements IJobDistributionStrategy
.
public class MachineTypeJobDistributionStrategy :
IJobDistributionStrategy
{
public Task<ActiveJob?> IsJobSlotAvailable(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
{
object? strategy = null;
jobTypeInfo.Properties?.TryGetValue("DistributionStrategy", out strategy);
return strategy switch
{
"MachineType" => MachineType(context, jobTypeInfo),
_ => DefaultJobDistributionStrategy.Instance.IsJobSlotAvailable(context, jobTypeInfo)
};
}
Task<ActiveJob?> MachineType(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
{
var customerType = context.GetHeader("CustomerType");
var machineType = customerType switch
{
"Premium" => "S-Class",
_ => "E-Class"
};
var instances = from i in jobTypeInfo.Instances
join a in jobTypeInfo.ActiveJobs on i.Key equals a.InstanceAddress into ai
where (ai.Count() < jobTypeInfo.ConcurrentJobLimit
&& string.IsNullOrEmpty(dataCenter))
|| (i.Value.Properties.TryGetValue("MachineType", out var mt) && mt is string mtext && mtext == machineType)
orderby ai.Count(), i.Value.Used
select new
{
Instance = i.Value,
InstanceAddress = i.Key,
InstanceCount = ai.Count()
};
var firstInstance = instances.FirstOrDefault();
if (firstInstance == null)
return Task.FromResult<ActiveJob?>(null);
return Task.FromResult<ActiveJob?>(new ActiveJob
{
JobId = context.Message.JobId,
InstanceAddress = firstInstance.InstanceAddress
});
}
}
Then register the strategy using the TryAddJobDistributionStrategy
method:
services.TryAddJobDistributionStrategy<MachineTypeJobDistributionStrategy>();
The strategy must be registered where the job saga state machines are registered and is not required on the job consumer bus instances.
The job distribution strategy is resolved from the container as a scoped service and any class dependencies will be resolved using the current scope
of the job type saga state machine. This allows dependencies to be injected, including the current DbContext
if using Entity Framework Core.
Job Strategy Options
To support the use of job distribution strategies, new properties were added to JobOptions<TJob>
. Following the example above, the MachineType
property
should be added at startup.
x.AddConsumer<ConvertVideoConsumer>(c =>
c.Options<JobOptions<ConvertVideo>>(options => options
.SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30)))
.SetJobTimeout(TimeSpan.FromMinutes(10))
.SetConcurrentJobLimit(10)
.SetJobTypeProperties(p => p.Set("DistributionStrategy", "MachineType"))
.SetInstanceProperties(p => p.Set("MachineType", "S-Class")));
The properties should be set using environmental information, such as machine type, data center location, or whatever makes sense for the desired strategy.
JobProperties
apply to the job type and InstanceProperties
apply to the job consumer bus instance (the bus instance containing the job consumer).