Scheduling Configuration

Time is important, particularly in distributed applications. Sophisticated systems need to schedule things, and MassTransit has extensive scheduling support.

MassTransit supports two different methods of message scheduling:

  1. Scheduler-based, using either Quartz.NET or Hangfire, where the scheduler runs in a service and schedules messages using a queue.
  2. Transport-based, using the transports built-in message scheduling/delay capabilities. In some cases, such as RabbitMQ, this requires an additional plug-in to be installed and configured.

Recurring schedules are only supported by Quartz.NET or Hangfire.

Configuration

Depending upon the scheduling method used, the bus must be configured to use the appropriate scheduler.

services.AddMassTransit(x =>
{
    Uri schedulerEndpoint = new Uri("queue:scheduler");

    x.AddMessageScheduler(schedulerEndpoint);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseMessageScheduler(schedulerEndpoint);

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransit(x =>
{
    x.AddDelayedMessageScheduler();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransit(x =>
{
    x.AddServiceBusMessageScheduler();

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.UseServiceBusMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransit(x =>
{
    x.AddDelayedMessageScheduler();

    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransit(x =>
{
    x.AddSqlMessageScheduler();

    x.UsingPostgres((context, cfg) =>
    {
        cfg.UseSqlMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransit(x =>
{
    x.AddDelayedMessageScheduler();

    x.UsingActiveMq((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});

RabbitMQ

Azure Service Bus

Amazon SQS

SQL Transport

Usage

To use the message scheduler (outside of a consumer), resolve IMessageScheduler from the container.

Consumer

To schedule messages from a consumer, use any of the ConsumeContext extension methods, such as ScheduleSend, to schedule messages.

services.AddMassTransit(x =>
{
    Uri schedulerEndpoint = new Uri("queue:scheduler");
    
    x.AddMessageScheduler(schedulerEndpoint);

    x.AddConsumer<ScheduleNotificationConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseMessageScheduler(schedulerEndpoint);

        cfg.ConfigureEndpoints(context);
    });
});
public class ScheduleNotificationConsumer :
    IConsumer<ScheduleNotification>
{
    public async Task Consume(ConsumeContext<ScheduleNotification> context)
    {
        Uri notificationService = new Uri("queue:notification-service");

        await context.ScheduleSend<SendNotification>(notificationService,
            context.Message.DeliveryTime, new()
            {
                EmailAddress = context.Message.EmailAddress,
                Body = context.Message.Body
            });
    }
}
public record ScheduleNotification
{
    public DateTime DeliveryTime { get; init; }
    public string EmailAddress { get; init; }
    public string Body { get; init; }
}
public record SendNotification
{
    public string EmailAddress { get; init; }
    public string Body { get; init; }
}

The message scheduler, specified during bus configuration, will be used to schedule the message.

Scope

To schedule messages from a bus, use IMessageScheduler from the container (or create a new one using the bus and appropriate scheduler).

services.AddMassTransit(x =>
{
    Uri schedulerEndpoint = new Uri("queue:scheduler");
    
    x.AddMessageScheduler(schedulerEndpoint);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseMessageScheduler(schedulerEndpoint);

        cfg.ConfigureEndpoints(context);
    });
});
await using var scope = provider.CreateAsyncScope();

var scheduler = scope.ServiceProvider.GetRequiredService<IMessageScheduler>();

await scheduler.SchedulePublish<SendNotification>(
    DateTime.UtcNow + TimeSpan.FromSeconds(30), new()
    {
        EmailAddress = "frank@nul.org",
        Body = "Thank you for signing up for our awesome newsletter!"
    });
public record SendNotification
{
    public string EmailAddress { get; init; }
    public string Body { get; init; }
}

Recurring Messages

Using Quartz.NET or Hangfire, you can schedule a message to be sent or published periodically. This functionality requires some knowledge of cron expressions.

A recurring message should have a unique ScheduleId along with an optional ScheduleGroup.

public class PollExternalSystemSchedule : 
    DefaultRecurringSchedule
{
    public PollExternalSystemSchedule()
    {
        ScheduleId = "PollExternalSystem";
        CronExpression = "0 0/1 * 1/1 * ? *"; // this means every minute
    }
}

public record PollExternalSystem;

To schedule a recurring message, using the IRecurringMessageScheduler interface, which can be resolved from the container (IServiceProvider). This interface is scoped, so it must be called from a valid container scope.

If using in a consumer, add IRecurringMessageScheduler as a constructor dependency.

If using from a hosted service, you will need to create a scope using IServiceScopeFactory (injected via the constructor) and calling CreateAsyncScope.
var scheduler = scope.ServiceProvider.GetService<IRecurringMessageScheduler>();

var message = await scheduler.ScheduleRecurringSend(
    InputQueueAddress, new PollExternalSystemSchedule(), new PollExternalSystem());

When you stop your service or just have any other need to tell Quartz service to stop sending you these recurring messages, you can use the return value of ScheduleRecurringSend to cancel the recurring schedule.

await scheduler.CancelScheduledRecurringMessage("PollExternalSystem", null);

Quartz.NET

To host Quartz.NET with MassTransit, configure Quartz and MassTransit as shown below.

services.AddQuartz(q =>
{
    q.UseMicrosoftDependencyInjectionJobFactory();
});
services.AddMassTransit(x =>
{
    x.AddPublishMessageScheduler();

    x.AddQuartzConsumers();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UsePublishMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});

Hangfire

services.AddHangfire(h =>
{
    h.UseRecommendedSerializerSettings();
    h.UseMemoryStorage();
});
services.AddMassTransit(x =>
{
    x.AddPublishMessageScheduler();

    x.AddHangfireConsumers();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.UsePublishMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
})