Observability

Monitoring

Open Telemetry

OpenTelemetry is an open-source standard for distributed tracing, which allows you to collect and analyze data about the performance of your systems. MassTransit can be configured to use OpenTelemetry to instrument message handling, so that you can collect telemetry data about messages as they flow through your system.

By using OpenTelemetry with MassTransit, you can gain insights into the performance of your systems, which can help you to identify and troubleshoot issues, and to improve the overall performance of your application.

There is a good set of examples opentelemetry-dotnet how it can be used for different cases

Tracing

ASP.NET Core application

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Console
var builder = WebApplication.CreateBuilder(args);

void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

builder.Services.AddOpenTelemetry()
    .ConfigureResource(ConfigureResource)
    .WithTracing(b => b
        .AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
        .AddConsoleExporter() // Any OTEL suportable exporter can be used here
    );
Console application

This example is using following packages:

  • OpenTelemetry
  • OpenTelemetry.Exporter.Console
void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

Sdk.CreateTracerProviderBuilder()
    .ConfigureResource(ConfigureResource)
    .AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
    .AddConsoleExporter() // Any OTEL suportable exporter can be used here
    .Build()

That's it you application will start exporting MassTransit related traces within your application

Metrics

ASP.NET Core application

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Console
var builder = WebApplication.CreateBuilder(args);

void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

builder.Services.AddOpenTelemetry()
    .ConfigureResource(ConfigureResource)
    .WithMetrics(b => b
        .AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
        .AddConsoleExporter() // Any OTEL suportable exporter can be used here
    );
Console application

This example is using following packages:

  • OpenTelemetry
  • OpenTelemetry.Exporter.Console
void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

Sdk.CreateTracerProviderBuilder()
    .ConfigureResource(ConfigureResource)
    .AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
    .AddConsoleExporter() // Any OTEL suportable exporter can be used here
    .Build()

The OpenTelemetry metrics captured by MassTransit:

Counters

NameDescription
messaging_masstransit_receiveNumber of messages received
messaging_masstransit_receive_errorsNumber of messages receive faults
messaging_masstransit_consumeNumber of messages consumed
messaging_masstransit_consume_errorsNumber of message consume faults
messaging_masstransit_sagaNumber of messages processed by saga
messaging_masstransit_saga_errorsNumber of message faults by saga
messaging_masstransit_consume_retriesNumber of message consume retries
messaging_masstransit_handlerNumber of messages handled
messaging_masstransit_handler_errorsNumber of message handler faults
messaging_masstransit_outbox_deliveryNumber of messages delivered by outbox
messaging_masstransit_outbox_delivery_errorsNumber of message delivery faults by outbox
messaging_masstransit_sendNumber of messages sent
messaging_masstransit_send_errorsNumber of message send faults
messaging_masstransit_outbox_sendNumber of messages sent to outbox
messaging_masstransit_outbox_send_errorsNumber of message send faults to outbox
messaging_masstransit_executeNumber of activities executed
messaging_masstransit_execute_errorsNumber of activity execution faults
messaging_masstransit_compensateNumber of activities compensated
messaging_masstransit_compensate_errorsNumber of activity compensation failures

Gauges

NameDescription
messaging_masstransit_receive_activeNumber of messages being received
messaging_masstransit_consume_activeNumber of consumers in progress
messaging_masstransit_execute_activeNumber of activity executions in progress
messaging_masstransit_compensate_activeNumber of activity compensations in progress
messaging_masstransit_handler_activeNumber of handlers in progress
messaging_masstransit_saga_activeNumber of sagas in progress

Histograms

NameDescription
messaging_masstransit_receive_durationElapsed time spent receiving a message, in millis
messaging_masstransit_consume_durationElapsed time spent consuming a message, in millis
messaging_masstransit_saga_durationElapsed time spent saga processing a message, in millis
messaging_masstransit_handler_durationElapsed time spent handler processing a message, in millis
messaging_masstransit_delivery_durationsElapsed time between when the message was sent and when it was consumed, in millis
messaging_masstransit_execute_durationElapsed time spent executing an activity, in millis
messaging_masstransit_compensate_durationElapsed time spent compensating an activity, in millis

Metric names and labels can be configured with Options:

services.Configure<InstrumentationOptions>(options =>
{
    // Configure
});

Application Insights

Azure Monitor has direct integration with Open Telemetry:

ASP.NET Core application

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • Azure.Monitor.OpenTelemetry.Exporter
var builder = WebApplication.CreateBuilder(args);

void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

builder.Services.AddOpenTelemetry()
    .ConfigureResource(ConfigureResource)
    .WithTracing(b => b
        .AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
        .AddAzureMonitorTraceExporter(
        {
            o.ConnectionString = "<Your Connection String>";
        }))
    .WithMetrics(b => b
        .AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
        .AddAzureMonitorMetricExporter(o =>
        {
            o.ConnectionString = "<Your Connection String>";
        }));
Console application

This example is using following packages:

  • OpenTelemetry
  • Azure.Monitor.OpenTelemetry.Exporter
void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

Sdk.CreateTracerProviderBuilder()
    .ConfigureResource(ConfigureResource)
    .AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
    .AddAzureMonitorTraceExporter(
    {
        o.ConnectionString = "<Your Connection String>";
    })
    .Build();

Sdk.CreateTracerProviderBuilder()
    .ConfigureResource(ConfigureResource)
    .AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
    .AddAzureMonitorMetricExporter(o =>
    {
        o.ConnectionString = "<Your Connection String>";
    })
    .Build()

You can also refer to the sample: Sample-ApplicationInsights

Prometheus

Open Telemetry is more preferable choice of integration

Open Telemetry integration

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Prometheus
  • OpenTelemetry.Exporter.Prometheus.AspNetCore
void ConfigureResource(ResourceBuilder r)
{
    r.AddService("Service Name",
        serviceVersion: "Version",
        serviceInstanceId: Environment.MachineName);
}

builder.Services.AddOpenTelemetry()
    .ConfigureResource(ConfigureResource)
    .WithMetrics(b => b
        .AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
        .AddPrometheusExporter()
    );
    
var app = builder.Build();

app.UseOpenTelemetryPrometheusScrapingEndpoint(); // Map prometheus metrics endpoint

In case you want to migrate from direct integration to using Open Telemetry, and use previous metric names, just configure them through Options:

builder.Services.Configure<InstrumentationOptions>(options =>
{
    ReceiveTotal = "mt.receive.total";
    // Configure other names by using similar approach
});

Direct integration

alt NuGet

MassTransit supports Prometheus metric capture, which provides useful observability into the bus, endpoints, consumers, and messages.

The prometheus-net library is used as the Prometheus client since it is mentioned on the Prometheus client list.

Installation

$ dotnet add package prometheus-net.AspNetCore
$ dotnet add package MassTransit.Prometheus

Configuration

To configure the bus to capture metrics, add the UsePrometheusMetrics() method to your bus configuration.

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UsePrometheusMetrics(serviceName: "order_service");
    });
});

To then mount the metrics to /metrics go to your Program.cs and add

app.UseEndpoints(endpoints =>
{
    endpoints.MapMetrics();
});

For more details, see the Prometheus-Net Documentation.

Metrics Captured

The metrics captured by MassTransit are listed below.

NameDescription
mt_receive_totalTotal number of messages received
mt_receive_fault_totalTotal number of messages receive faults
mt_receive_duration_secondsElapsed time spent receiving messages, in seconds
mt_receive_in_progressNumber of messages being received
mt_consume_totalTotal number of messages consumed
mt_consume_fault_totalTotal number of message consume faults
mt_consume_retry_totalTotal number of message consume retries
mt_consume_duration_secondsElapsed time spent consuming a message, in seconds
mt_delivery_duration_secondsElapsed time between when the message was sent and when it was consumed, in seconds.
mt_publish_totalTotal number of messages published
mt_publish_fault_totalTotal number of message publish faults
mt_send_totalTotal number of messages sent
mt_send_fault_totalTotal number of message send faults
mt_busNumber of bus instances
mt_endpointNumber of receive endpoint instances
mt_consumer_in_progressNumber of consumers in progress
mt_handler_in_progressNumber of handlers in progress
mt_saga_in_progressNumber of sagas in progress
mt_activity_execute_in_progressNumber of activity executions in progress
mt_activity_compensate_in_progressNumber of activity compensations in progress
mt_activity_execute_totalTotal number of activities executed
mt_activity_execute_fault_totalTotal number of activity executions faults
mt_activity_execute_duration_secondsElapsed time spent executing an activity, in seconds
mt_activity_compensate_totalTotal number of activities compensated
mt_activity_compensate_failure_totalTotal number of activity compensation failures
mt_activity_compensate_duration_secondsElapsed time spent compensating an activity, in seconds

Labels

For the metrics above, labels are specified where appropriate.

NameDescription
service_nameThe service name specified at bus configuration
endpoint_addressThe endpoint address
message_typeThe message type for the metric
consumer_typeThe consumer, saga, or activity type for the metric
activity_nameThe activity name
argument_typeThe activity execute argument type
log_typeThe activity compensate log type
exception_typeThe exception type for a fault metric

Example Docker Compose

version: "3.7"

services:
  prometheus:
    image: prom/prometheus
    ports:
     - "9090:9090"

Example MassTransit Prometheus Config File

You can use the domain host.docker.internal to access process running on the host machine.

global:
  scrape_interval: 10s

scrape_configs:
  - job_name: masstransit
    tls_config:
      insecure_skip_verify: true
    scheme: https
    static_configs:
      - targets:
        - 'host.docker.internal:5001'

Lifetime Observers

MassTransit supports several message observers allowing received, consumed, sent, and published messages to be monitored. There is a bus observer as well, so that the bus life cycle can be monitored.

Observers should not be used to modify or intercept messages. To intercept messages to add headers or modify message content, create a new or use an existing middleware component.

Bus

To observe bus life cycle events, create a class which implements IBusObserver. To configure a bus observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddBusObserver<BusObserver>();
services.AddBusObserver(provider => new BusObserver());

Receive Endpoint

To configure a receive endpoint observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddReceiveEndpointObserver<ReceiveEndpointObserver>();
services.AddReceiveEndpointObserver(provider => new ReceiveEndpointObserver());

Pipeline Observers

Receive

To observe messages as they are received by the transport, create a class that implements the IReceiveObserver interface, and connect it to the bus as shown below.

To configure a receive observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectReceiveObserver bus method can be used instead.

services.AddReceiveObserver<ReceiveObserver>();
services.AddReceiveObserver(provider => new ReceiveObserver());

Consume

If the ReceiveContext isn't fascinating enough for you, perhaps the actual consumption of messages might float your boat. A consume observer implements the IConsumeObserver interface, as shown below.

To configure a consume observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectConsumeObserver bus method can be used instead.

services.AddConsumeObserver<ConsumeObserver>();
services.AddConsumeObserver(provider => new ConsumeObserver());

Consume Message

Okay, so it's obvious that if you've read this far you want a more specific observer, one that only is called when a specific message type is consumed. We have you covered there too, as shown below.

To connect the observer, use the ConnectConsumeMessageObserver method before starting the bus.

The ConsumeMessageObserver<T> interface may be deprecated at some point, it's sort of a legacy observer that isn't recommended.

Send

Okay, so, incoming messages are not your thing. We get it, you're all about what goes out. It's cool. It's better to send than to receive. Or is that give? Anyway, a send observer is also available.

To configure a send observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectSendObserver bus method can be used instead.

services.AddSendObserver<SendObserver>();
services.AddSendObserver(provider => new SendObserver());

Publish

In addition to send, publish is also observable. Because the semantics matter, absolutely. Using the MessageId to link them up as it's unique for each message. Remember that Publish and Send are two distinct operations so if you want to observe all messages that are leaving your service, you have to connect both Publish and Send observers.

To configure a public observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectPublishObserver bus method can be used instead.

services.AddPublishObserver<PublishObserver>();
services.AddPublishObserver(provider => new PublishObserver());

State Machine Observers

Event

To observe events consumed by a saga state machine, use an IEventObserver<T> where T is the saga instance type.

To configure an event observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddEventObserver<T, EventObserver<T>>();
services.AddEventObserver<T>(provider => new EventObserver<T>());

State

To observe state changes that happen in a saga state machine, use an IStateObserver<T> where T is the saga instance type.

To configure a state observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddStateObserver<T, StateObserver<T>>();
services.AddStateObserver<T>(provider => new StateObserver<T>());