Scoped Middleware Filters
Scoped Filters
Most of the built-in filters are created and added to the pipeline during configuration. This approach is typically sufficient, however, there are scenarios where the filter needs access to other components at runtime.
Using a scoped filter allows a new filter instance to be resolved from the container for each message. If a current scope is not available, a new scope will be created using the root container.
Filter Classes
Scoped filters can be either an open generic class implementing one of the supported filter contexts or a concrete class implementing a filter context for one more valid message type(s).
For example, a scoped open generic consume filter would be defined as shown below.
public class TFilter<TMessage> :
IFilter<ConsumeContext<TMessage>>
A concrete consume filter can also be defined.
public class MyMessageConsumeFilter :
IFilter<ConsumeContext<MyMessage>>
Supported Filter Contexts
Scope filters are added using one of the following methods, which are specific to the filter context type.
Type | Usage |
---|---|
ConsumeContext<T> | UseConsumeFilter(typeof(TFilter<>), context) |
SendContext<T> | UseSendFilter(typeof(TFilter<>), context) |
PublishContext<T> | UsePublishFilter(typeof(TFilter<>), context) |
ExecuteContext<TArguments> | UseExecuteActivityFilter(typeof(TFilter<>), context) |
CompensateContext<TLog> | UseCompensateActivityFilter(typeof(TFilter<>), context) |
More information could be found inside Middleware section
UseConsumeFilter
To create a ConsumeContext<T>
filter and add it to the receive endpoint:
public class MyConsumeFilter<T> :
IFilter<ConsumeContext<T>>
where T : class
{
public MyConsumeFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("input-queue", e =>
{
e.UseConsumeFilter(typeof(MyConsumeFilter<>), context);
e.ConfigureConsumer<MyConsumer>();
});
});
});
}
}
To configure a scoped filter for a specific message type (or types) and configure it on all receive endpoints:
public class MyMessageConsumeFilter :
IFilter<ConsumeContext<MessageA>>,
IFilter<ConsumeContext<MessageB>>
where T : class
{
public MyConsumeFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<MessageA> context, IPipe<ConsumeContext<MessageA>> next)
{
await next.Send(context);
}
public async Task Send(ConsumeContext<MessageB> context, IPipe<ConsumeContext<MessageB>> next)
{
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseConsumeFilter<MyMessageConsumerFilter>(context);
cfg.ConfigureEndpoints(context);
});
});
}
}
To use an open generic filter but only configure the filter for specific message types:
public class MyCommandFilter<T> :
IFilter<ConsumeContext<T>>
where T : class, ICommand
{
public MyCommandFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
// Specify a conditional expression to only
// add the filter for certain message types
cfg.UseConsumeFilter(typeof(MyCommandFilter<>), context,
x => x.Include(type => type.HasInterface<ICommand>()));
cfg.ConfigureEndpoints(context);
});
});
}
}
UseSendFilter
To create a SendContext<T>
filter and add it to the send pipeline:
public class MySendFilter<T> :
IFilter<SendContext<T>>
where T : class
{
public MySendFilter(IMyDependency dependency) { }
public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
{
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseSendFilter(typeof(MySendFilter<>), context);
});
});
}
}
UsePublishFilter
public class MyPublishFilter<T> :
IFilter<PublishContext<T>>
where T : class
{
public MyPublishFilter(IMyDependency dependency) { }
public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next)
{
await next.Send(context);
}
public void Probe(ProbeContext context) { }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.UsePublishFilter(typeof(MyPublishFilter<>), context);
});
});
}
}
Combining Consume And Send/Publish Filters
A common use case with scoped filters is transferring data between the consumer. This data may be extracted from headers, or could include context or authorization information that needs to be passed from a consumed message context to sent or published messages. In these situations, there may be some special requirements to ensure everything works as expected.
The following example has both consume and send filters, and utilize a shared dependency to communicate data to outbound messages.
public class MyConsumeFilter<T> :
IFilter<ConsumeContext<T>>
where T : class
{
public MyConsumeFilter(MyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { }
public void Probe(ProbeContext context) { }
}
public class MySendFilter<T> :
IFilter<SendContext<T>>
where T : class
{
public MySendFilter(MyDependency dependency) { }
public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next) { }
public void Probe(ProbeContext context) { }
}
public class MyDependency
{
public string SomeValue { get; set; }
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<MyDependency>();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseSendFilter(typeof(MySendFilter<>), context);
cfg.ReceiveEndpoint("input-queue", e =>
{
e.UseConsumeFilter(typeof(MyConsumeFilter<>), context);
e.ConfigureConsumer<MyConsumer>(context);
});
});
});
}
}
UseMessageScope
(for MSDI) or UseMessageLifetimeScope
(for Autofac) must be configured before the InMemoryOutbox. If UseMessageRetry
is used, it must come before either UseMessageScope
or UseMessageLifetimeScope
.Because the InMemoryOutbox delays publishing and sending messages until after the consumer or saga completes, the created container scope will have been disposed. The UseMessageScope
or UseMessageLifetimeScope
filters create the scope before the InMemoryOutbox, which is then used by the consumer or saga and any scoped filters (consume, publish, or send).
The updated receive endpoint configuration using the InMemoryOutbox is shown below.
cfg.ReceiveEndpoint("input-queue", e =>
{
e.UseMessageRetry(r => r.Intervals(100, 500, 1000, 2000));
e.UseMessageScope(context);
e.UseInMemoryOutbox();
e.UseConsumeFilter(typeof(MyConsumeFilter<>), context);
e.ConfigureConsumer<MyConsumer>(context);
});