Requests
Request/response is a commonly used message pattern where one service sends a request to another service, continuing after the response is received. In a distributed system, this can increase the latency of an application since the service may be hosted in another process, on another machine, or may even be a remote service in another network. While in many cases it is best to avoid request/response use in distributed applications, particularly when the request is a command, it is often necessary and preferred over more complex solutions.
In MassTransit, developers use a request client to send or publish requests and wait for a response. The request client is asynchronous, and supports use of the await keyword since it returns a Task.
Message Contracts
To use the request client, create two message contracts, one for the request and one for the response.
public record CheckOrderStatus
{
public string OrderId { get; init; }
}
public record OrderStatusResult
{
public string OrderId { get; init; }
public DateTime Timestamp { get; init; }
public short StatusCode { get; init; }
public string StatusText { get; init; }
}
Request Consumer
Request messages can be handled by any consumer type, including consumers, sagas, and routing slips. In this case, the consumer below consumes the CheckOrderStatus message and responds with the OrderStatusResult message.
public class CheckOrderStatusConsumer :
IConsumer<CheckOrderStatus>
{
readonly IOrderRepository _orderRepository;
public CheckOrderStatusConsumer(IOrderRepository orderRepository)
{
_orderRepository = orderRepository;
}
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
var order = await _orderRepository.Get(context.Message.OrderId);
if (order == null)
throw new InvalidOperationException("Order not found");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = order.Id,
order.Timestamp,
order.StatusCode,
order.StatusText
});
}
}
If the OrderId is found in the repository, an OrderStatusResult message will be sent to the response address included with the request. The waiting request client will handle the response and complete the returned Task allowing the requesting application to continue.
If the OrderId was not found, the consumer throws an exception. MassTransit catches the exception, generates a Fault<CheckOrderStatus>
message, and sends it to the response address. The request client handles the fault message and throws a RequestFaultException via the awaited Task containing the exception detail.
Request Client
To use the request client, add the request client as a dependency as shown in the example API controller below.
public class RequestController :
Controller
{
IRequestClient<CheckOrderStatus> _client;
public RequestController(IRequestClient<CheckOrderStatus> client)
{
_client = client;
}
[HttpGet("{orderId}")]
public async Task<IActionResult> Get(string orderId, CancellationToken cancellationToken)
{
var response = await _client.GetResponse<OrderStatusResult>(new { orderId }, cancellationToken);
return Ok(response.Message);
}
}
The controller method will send the request and return the order status after the response has been received.
If the cancellationToken passed to GetResponse is canceled, the request client will stop waiting for a response. However, the request message produced remains in the queue until it is consumed or the message time-to-live expires. By default, the message time-to-live is set to the request timeout (which defaults to 30 seconds).
Client Configuration
A request client can be resolved using dependency injection for any valid message type, no configuration is required. By default, request messages are published and should be consumed by only one consumer/receive endpoint connected to the message broker. Multiple consumers connected to the same receive endpoint are fine, requests will be load balanced across the connected consumers.
To configure the request client for a message type, add the request client to the configuration explicitly.
services.AddMassTransit(x =>
{
// configure the consumer on a specific endpoint address
x.AddConsumer<CheckOrderStatusConsumer>()
.Endpoint(e => e.Name = "order-status");
// Sends the request to the specified address, instead of publishing it
x.AddRequestClient<CheckOrderStatus>(new Uri("exchange:order-status"));
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
}));
});
Request Headers
To create a request and add a header to the SendContext
, one option is to add an execute filter to the request pipeline.
await client.GetResponse<OrderStatusResult>(new GetOrderStatus{ OrderId = orderId },
x => x.UseExecute(context => context.Headers.Set("tenant-id", "some-value")));
Another option is to use the object values overload, which uses a message initializer, to specify the header value. Learn more about message initializers in the Concepts section.
await client.GetResponse<OrderStatusResult>(new
{
orderId,
__Header_Tenant_Id = "some-value"
});
Multiple Response Types
Another powerful feature with the request client is the ability support multiple (such as positive and negative) result types. For example, adding an OrderNotFound
response type to the consumer as shown eliminates throwing an exception since a missing order isn't really a fault.
public class CheckOrderStatusConsumer :
IConsumer<CheckOrderStatus>
{
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
var order = await _orderRepository.Get(context.Message.OrderId);
if (order == null)
await context.RespondAsync<OrderNotFound>(context.Message);
else
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = order.Id,
order.Timestamp,
order.StatusCode,
order.StatusText
});
}
}
The client can now wait for multiple response types (in this case, two) by using a little tuple magic.
var response = await client.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});
if (response.Is(out Response<OrderStatusResult> responseA))
{
// do something with the order
}
else if (response.Is(out Response<OrderNotFound> responseB))
{
// the order was not found
}
This cleans up the processing, an eliminates the need to catch a RequestFaultException
.
It's also possible to use some of the switch expressions via deconstruction, but this requires the response variable to be explicitly specified as Response
.
Response response = await client.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});
// Using a regular switch statement
switch (response)
{
case (_, OrderStatusResult a) responseA:
// order found
break;
case (_, OrderNotFound b) responseB:
// order not found
break;
}
// Or using a switch expression
var accepted = response switch
{
(_, OrderStatusResult a) => true,
(_, OrderNotFound b) => false,
_ => throw new InvalidOperationException()
};
Accept Response Types
The request client sets a message header, MT-Request-AcceptType
, that contains the response types supported by the request client. This allows the request consumer to determine if the client can handle a response type, which can be useful as services evolve and new response types may be added to handle new conditions. For instance, if a consumer adds a new response type, such as OrderAlreadyShipped
, if the response type isn't supported an exception may be thrown instead.
To see this in code, check out the client code:
var response = await client.GetResponse<OrderCanceled, OrderNotFound>(new CancelOrder());
if (response.Is(out Response<OrderCanceled> canceled))
{
return Ok();
}
else if (response.Is(out Response<OrderNotFound> responseB))
{
return NotFound();
}
The original consumer, prior to adding the new response type:
public async Task Consume(ConsumeContext<CancelOrder> context)
{
var order = _repository.Load(context.Message.OrderId);
if(order == null)
{
await context.ResponseAsync<OrderNotFound>(new { context.Message.OrderId });
return;
}
order.Cancel();
await context.RespondAsync<OrderCanceled>(new { context.Message.OrderId });
}
Now, the new consumer that checks if the order has already shipped:
public async Task Consume(ConsumeContext<CancelOrder> context)
{
var order = _repository.Load(context.Message.OrderId);
if(order == null)
{
await context.ResponseAsync<OrderNotFound>(new { context.Message.OrderId });
return;
}
if(order.HasShipped)
{
if (context.IsResponseAccepted<OrderAlreadyShipped>())
{
await context.RespondAsync<OrderAlreadyShipped>(new { context.Message.OrderId, order.ShipDate });
return;
}
else
throw new InvalidOperationException("The order has already shipped"); // to throw a RequestFaultException in the client
}
order.Cancel();
await context.RespondAsync<OrderCanceled>(new { context.Message.OrderId });
}
This way, the consumer can check the request client response types and act accordingly.
MT-Request-AcceptType
header is not found, IsResponseAccepted
will return true for all message types.Concurrent Requests
If there were multiple requests to be performed, it is easy to wait on all results at the same time, benefiting from the concurrent operation.
public class RequestController :
Controller
{
IRequestClient<RequestA> _clientA;
IRequestClient<RequestB> _clientB;
public RequestController(IRequestClient<RequestA> clientA, IRequestClient<RequestB> clientB)
{
_clientA = clientA;
_clientB = clientB;
}
public async Task<ActionResult> Get()
{
var resultA = _clientA.GetResponse(new RequestA());
var resultB = _clientB.GetResponse(new RequestB());
await Task.WhenAll(resultA, resultB);
var a = await resultA;
var b = await resultB;
var model = new Model(a.Message, b.Message);
return View(model);
}
}
The power of concurrency, for the win!
Request Handle
Client factories or the request client can also be used to create a request instead of calling GetResponse
. This is an uncommon scenario, but is available as an option and may make sense depending on the situation. If a request is created (which returns a RequestHandle<T>
), the request handle must be disposed after the request completes.
Using
Create
returns a request handle, which can be used to set headers and other attributes of the request before it is sent.
public interface IRequestClient<TRequest>
where TRequest : class
{
RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}
For
RequestTimeout
three options are available,None
,Default
, and a factory withRequestTimeout.After
.None
would never be recommended since it would essentially wait forever for a response. There is always a relevant timeout, or you're using the wrong pattern.
Request Client Factory
The internals are documented for understanding, but what follows is optional reading. The above container-based configuration handles all the details to ensure the proper context is used.
The request client is composed of two parts, a client factory and a request client. There are two client factories, the scoped client factory, and the bus client factory.
IScopedClientFactory
Using IRequestClient
requires a container scope, and the request client for a request message type is resolved from container scope using a scoped client factory. As an alternative to specifying IRequestClient<T>
as a constructor dependency, the scoped client factory can be used instead of create a request client directly. This can be useful when the destination address may change based on context, such as a TenantId.
public interface IScopedClientFactory
{
IRequestClient<T> CreateRequestClient<T>(RequestTimeout timeout = default)
where T : class;
IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout = default)
where T : class;
}
An example showing how to use IScopedClientFactory
is shown below.
[HttpGet]
public async Task<IActionResult> HandleGet(string tenantId, int id, [FromServices] IScopedClientFactory clientFactory)
{
var serviceAddress = new Uri($"exchange:check-order-status-{tenantId}");
var client = clientFactory.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
return Ok();
}
IClientFactory
If there is no container scope available, and one cannot be created, the root client factory can be used instead. Note that non-scoped interfaces are not compatible with scoped publish or send filters.
public interface IClientFactory
{
IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
}
An example showing how to use IClientFactory
is shown below.
public async Task WorkerMethod(IServiceProvider provider)
{
var clientFactory = provider.GetRequiredService<IClientFactory>();
var serviceAddress = new Uri("exchange:check-order-status");
var client = clientFactory.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
}