Requests
Request/response is a commonly used message pattern where one service sends a request to another service, continuing after the response is received. In a distributed system, this can increase the latency of an application since the service may be hosted in another process, on another machine, or may even be a remote service in another network. While in many cases it is best to avoid request/response use in distributed applications, particularly when the request is a command, it is often necessary and preferred over more complex solutions.
In MassTransit, developers use a request client to send or publish requests and wait for a response. The request client is asynchronous, and supports use of the await keyword since it returns a Task.
Message Contracts
To use the request client, create two message contracts, one for the request and one for the response.
public record CheckOrderStatus{ public string OrderId { get; init; }}public record OrderStatusResult{ public string OrderId { get; init; } public DateTime Timestamp { get; init; } public short StatusCode { get; init; } public string StatusText { get; init; }}
Request Consumer
Request messages can be handled by any consumer type, including consumers, sagas, and routing slips. In this case, the consumer below consumes the CheckOrderStatus message and responds with the OrderStatusResult message.
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>{ readonly IOrderRepository _orderRepository; public CheckOrderStatusConsumer(IOrderRepository orderRepository) { _orderRepository = orderRepository; } public async Task Consume(ConsumeContext<CheckOrderStatus> context) { var order = await _orderRepository.Get(context.Message.OrderId); if (order == null) throw new InvalidOperationException("Order not found"); await context.RespondAsync<OrderStatusResult>(new { OrderId = order.Id, order.Timestamp, order.StatusCode, order.StatusText }); }}
If the OrderId is found in the repository, an OrderStatusResult message will be sent to the response address included with the request. The waiting request client will handle the response and complete the returned Task allowing the requesting application to continue.
If the OrderId was not found, the consumer throws an exception. MassTransit catches the exception, generates a Fault<CheckOrderStatus>
message, and sends it to the response address. The request client handles the fault message and throws a RequestFaultException via the awaited Task containing the exception detail.
Request Client
To use the request client, add the request client as a dependency as shown in the example API controller below.
public class RequestController : Controller{ IRequestClient<CheckOrderStatus> _client; public RequestController(IRequestClient<CheckOrderStatus> client) { _client = client; } [HttpGet("{orderId}")] public async Task<IActionResult> Get(string orderId, CancellationToken cancellationToken) { var response = await _client.GetResponse<OrderStatusResult>(new { orderId }, cancellationToken); return Ok(response.Message); }}
The controller method will send the request and return the order status after the response has been received.
If the cancellationToken passed to GetResponse is canceled, the request client will stop waiting for a response. However, the request message produced remains in the queue until it is consumed or the message time-to-live expires. By default, the message time-to-live is set to the request timeout (which defaults to 30 seconds).
Client Configuration
A request client can be resolved using dependency injection for any valid message type, no configuration is required. By default, request messages are published and should be consumed by only one consumer/receive endpoint connected to the message broker. Multiple consumers connected to the same receive endpoint are fine, requests will be load balanced across the connected consumers.
To configure the request client for a message type, add the request client to the configuration explicitly.
services.AddMassTransit(x =>{ // configure the consumer on a specific endpoint address x.AddConsumer<CheckOrderStatusConsumer>() .Endpoint(e => e.Name = 'order-status'); // Sends the request to the specified address, instead of publishing it x.AddRequestClient<CheckOrderStatus>(new Uri("exchange:order-status")); x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }));});
Request Headers
To create a request and add a header to the SendContext
, one option is to add an execute filter to the request pipeline.
await client.GetResponse<OrderStatusResult>(new GetOrderStatus{ OrderId = orderId }, x => x.UseExecute(context => context.Headers.Set("tenant-id", "some-value")));
Another option is to use the object values overload, which uses a message initializer, to specify the header value. Learn more about message initializers in the Concepts section.
await client.GetResponse<OrderStatusResult>(new { orderId, __Header_Tenant_Id = "some-value"});
Multiple Response Types
Another powerful feature with the request client is the ability support multiple (such as positive and negative) result types. For example, adding an OrderNotFound
response type to the consumer as shown eliminates throwing an exception since a missing order isn't really a fault.
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>{ public async Task Consume(ConsumeContext<CheckOrderStatus> context) { var order = await _orderRepository.Get(context.Message.OrderId); if (order == null) await context.RespondAsync<OrderNotFound>(context.Message); else await context.RespondAsync<OrderStatusResult>(new { OrderId = order.Id, order.Timestamp, order.StatusCode, order.StatusText }); }}
The client can now wait for multiple response types (in this case, two) by using a little tuple magic.
var response = await client.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});if (response.Is(out Response<OrderStatusResult> responseA)){ // do something with the order}else if (response.Is(out Response<OrderNotFound> responseB)){ // the order was not found}
This cleans up the processing, an eliminates the need to catch a RequestFaultException
.
It's also possible to use some of the switch expressions via deconstruction, but this requires the response variable to be explicitly specified as Response
.
Response response = await client.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});// Using a regular switch statementswitch (response){ case (_, OrderStatusResult a) responseA: // order found break; case (_, OrderNotFound b) responseB: // order not found break;}// Or using a switch expressionvar accepted = response switch{ (_, OrderStatusResult a) => true, (_, OrderNotFound b) => false, _ => throw new InvalidOperationException()};
Accept Response Types
The request client sets a message header, MT-Request-AcceptType
, that contains the response types supported by the request client. This allows the request consumer to determine if the client can handle a response type, which can be useful as services evolve and new response types may be added to handle new conditions. For instance, if a consumer adds a new response type, such as OrderAlreadyShipped
, if the response type isn't supported an exception may be thrown instead.
To see this in code, check out the client code:
var response = await client.GetResponse<OrderCanceled, OrderNotFound>(new CancelOrder());if (response.Is(out Response<OrderCanceled> canceled)){ return Ok();}else if (response.Is(out Response<OrderNotFound> responseB)){ return NotFound();}
The original consumer, prior to adding the new response type:
public async Task Consume(ConsumeContext<CancelOrder> context){ var order = _repository.Load(context.Message.OrderId); if(order == null) { await context.ResponseAsync<OrderNotFound>(new { context.Message.OrderId }); return; } order.Cancel(); await context.RespondAsync<OrderCanceled>(new { context.Message.OrderId });}
Now, the new consumer that checks if the order has already shipped:
public async Task Consume(ConsumeContext<CancelOrder> context){ var order = _repository.Load(context.Message.OrderId); if(order == null) { await context.ResponseAsync<OrderNotFound>(new { context.Message.OrderId }); return; } if(order.HasShipped) { if (context.IsResponseAccepted<OrderAlreadyShipped>()) { await context.RespondAsync<OrderAlreadyShipped>(new { context.Message.OrderId, order.ShipDate }); return; } else throw new InvalidOperationException("The order has already shipped"); // to throw a RequestFaultException in the client } order.Cancel(); await context.RespondAsync<OrderCanceled>(new { context.Message.OrderId });}
This way, the consumer can check the request client response types and act accordingly.
MT-Request-AcceptType
header is not found, IsResponseAccepted
will return true for all message types.Concurrent Requests
If there were multiple requests to be performed, it is easy to wait on all results at the same time, benefiting from the concurrent operation.
public class RequestController : Controller{ IRequestClient<RequestA> _clientA; IRequestClient<RequestB> _clientB; public RequestController(IRequestClient<RequestA> clientA, IRequestClient<RequestB> clientB) { _clientA = clientA; _clientB = clientB; } public async Task<ActionResult> Get() { var resultA = _clientA.GetResponse(new RequestA()); var resultB = _clientB.GetResponse(new RequestB()); await Task.WhenAll(resultA, resultB); var a = await resultA; var b = await resultB; var model = new Model(a.Message, b.Message); return View(model); }}
The power of concurrency, for the win!
Request Client Details
The internals are documented for understanding, but what follows is optional reading. The above container-based configuration handles all the details to ensure the property context is used.
The request client is composed of two parts, a client factory, and a request client. The client factory is created from the bus, or a connected endpoint, and has the interface below (some overloads are omitted, but you get the idea).
public interface IClientFactory { IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout); IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);}
As shown, the client factory can create a request client, or it can create a request directly. There are advantages to each approach, although it's typically best to create a request client and use it if possible. If a consumer is sending the request, a new client should be created for each message (and is handled automatically if you're using a dependency injection container and the container registration methods).
To create a client factory, call bus.CreateClientFactory
or host.CreateClientFactory
-- after the bus has been started.
The request client can be used to create requests (returning a RequestHandle<T>
, which must be disposed after the request completes) or it can be used directly to send a request and get a response (asynchronously, of course).
Using
Create
returns a request handle, which can be used to set headers and other attributes of the request before it is sent.
public interface IRequestClient<TRequest> where TRequest : class{ RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout); Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);}
For
RequestTimeout
three options are available,None
,Default
, and a factory withRequestTimeout.After
.None
would never be recommended since it would essentially wait forever for a response. There is always a relevant timeout, or you're using the wrong pattern.
Sending a Request
To create a request client, and use it to make a standalone request (not from a consumer, API controller, etc.):
var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
The response type, Response<OrderStatusResult>
includes the MessageContext from when the response was received, providing access to the message properties (such as response.ConversationId
) and headers (response.Headers
).