SQL/DB

The SQL database transport is implemented for both PostgreSQL and SQL Server.

Transport Functions

MassTransit uses PostgreSQL functions or SQL Server stored procedures for all transport operations. These functions are built to manage the constraints of the transport, such as message locking, redelivery, and subscription.

For completeness, the functions are detailed below.

Create Queue

create_queue(queue_name text, 
             auto_delete integer DEFAULT NULL)

Creates a queue with its associated error and dead-letter queues. The resulting three queues each have their own type.

Create Topic

create_topic(topic_name text)

Creates a topic.

Create Topic Subscription

create_topic_subscription(source_topic_name text, 
                          destination_topic_name text, 
                          type integer,
                          routing_key text DEFAULT '',
                          filter jsonb DEFAULT '{{}}')

Creates a subscription from one topic to another topic.

Create Queue Subscription

create_queue_subscription(source_topic_name text, 
                          destination_queue_name text, 
                          type integer,
                          routing_key text DEFAULT '',
                          filter jsonb DEFAULT '{{}}')

Creates a subscription from a topic to a queue.

Purge Queue

purge_queue(queue_name text)

Removes all messages from a queue, including messages in the error and dead-letter sub-queues.

Fetch Messages

fetch_messages(queue_name text,
               fetch_consumer_id uuid,
               fetch_lock_id uuid,
               lock_duration interval,
               fetch_count integer DEFAULT 1)

Fetches messages from the specified queue.

Fetch Messages Partitioned

fetch_messages_partitioned(queue_name text,
               fetch_consumer_id uuid,
               fetch_lock_id uuid,
               lock_duration interval,
               fetch_count integer DEFAULT 1,
               concurrent_count integer DEFAULT 1,
               ordered integer DEFAULT 0)

Fetches messages from the specified queue using the partitioned receive mode.

Delete Message

delete_message(message_delivery_id bigint, 
               lock_id uuid)

Deletes a message that was previously fetched with the specified lock_id.

Renew Message Lock

renew_message_lock(message_delivery_id bigint, 
                   lock_id uuid, 
                   duration interval)

Renews (extends) the lock on a message that was previously fetched with the specified lock_id.

Move Message

move_message(message_delivery_id bigint, 
             lock_id uuid, 
             queue_name text, 
             queue_type integer, 
             headers jsonb)

Moves a message that was previously fetched with the specified lock_id to the destination queue, adding the headers to the message delivery. Typically used by the receive endpoint to either dead-letter (skipped) or fault (error) a message.

Unlock Message

unlock_message(message_delivery_id bigint, 
               lock_id uuid,
               delay interval,
               headers jsonb)

Unlocks a message that was previously fetched with the specified lock_id, adding the specified delay to the enqueue_time and any additional headers. Typically used when delayed message redelivery is used, or when faults are thrown back to the transport.

Send Message

send_message(entity_name text,
             priority integer DEFAULT NULL,
             transport_message_id uuid DEFAULT gen_random_uuid(),
             body jsonb DEFAULT NULL,
             binary_body bytea DEFAULT NULL,
             content_type text DEFAULT NULL,
             message_type text DEFAULT NULL,
             message_id uuid DEFAULT NULL,
             correlation_id uuid DEFAULT NULL,
             conversation_id uuid DEFAULT NULL,
             request_id uuid DEFAULT NULL,
             initiator_id uuid DEFAULT NULL,
             source_address text DEFAULT NULL,
             destination_address text DEFAULT NULL,
             response_address text DEFAULT NULL,
             fault_address text DEFAULT NULL,
             sent_time timestamptz DEFAULT NULL,
             headers jsonb DEFAULT NULL,
             host jsonb DEFAULT NULL,
             partition_key text DEFAULT NULL,
             routing_key text DEFAULT NULL,
             delay interval DEFAULT INTERVAL '0 seconds',
             scheduling_token_id uuid DEFAULT NULL,
             max_delivery_count int DEFAULT 10)

Sends a message to the entity_name queue with all the specified properties.

Publish Message

publish_message(entity_name text,
             priority integer DEFAULT NULL,
             transport_message_id uuid DEFAULT gen_random_uuid(),
             body jsonb DEFAULT NULL,
             binary_body bytea DEFAULT NULL,
             content_type text DEFAULT NULL,
             message_type text DEFAULT NULL,
             message_id uuid DEFAULT NULL,
             correlation_id uuid DEFAULT NULL,
             conversation_id uuid DEFAULT NULL,
             request_id uuid DEFAULT NULL,
             initiator_id uuid DEFAULT NULL,
             source_address text DEFAULT NULL,
             destination_address text DEFAULT NULL,
             response_address text DEFAULT NULL,
             fault_address text DEFAULT NULL,
             sent_time timestamptz DEFAULT NULL,
             headers jsonb DEFAULT NULL,
             host jsonb DEFAULT NULL,
             partition_key text DEFAULT NULL,
             routing_key text DEFAULT NULL,
             delay interval DEFAULT INTERVAL '0 seconds',
             scheduling_token_id uuid DEFAULT NULL,
             max_delivery_count int DEFAULT 10)

Publishes a message to the entity_name topic with all the specified properties. Queues with matching subscriptions to the topic will each receive an instance of the message.

Delete Scheduled Message

delete_scheduled_message(token_id uuid)

Deletes a previously scheduled message using the token_id.

Touch Queue

touch_queue(queue_name text)

When a receive endpoint doesn't receive any messages from a queue for a period of time, and that queue is an auto-delete queue, this function is called to add metrics for the queue so that it isn't automatically deleted.

Process Metrics

process_metrics(row_limit int DEFAULT 10000)

A background function used to process queue metrics, which includes messages consumed, faulted, and dead-lettered. Automatically called by idle receive endpoints. Metrics can be viewed using the queues view.

Purge Topology

purge_topology()

A background function used to purge any auto-delete queues that have reached their idle threshold. Automatically called by idle receive endpoints.

End-User Functions

Requeue Messages

requeue_messages(queue_name text, 
                 source_queue_type int, 
                 target_queue_type int, 
                 message_count int,
                 delay interval DEFAULT INTERVAL '0 seconds', 
                 redelivery_count int DEFAULT 10)

Use this function to move messages from the error or dead-letter queue back to the main queue. Up to message_count messages are moved. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message's max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

Requeue Message

requeue_message(message_delivery_id bigint,
                target_queue_type int,
                delay interval DEFAULT INTERVAL '0 seconds',
                redelivery_count int DEFAULT 10)

Use this function to move a message from the error or dead-letter queue back to the main queue. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message's max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

Views

Queues

Returns details about the queues and their metrics.

Subscriptions

Returns details about all subscriptions and their settings.

Tables

The tables used by the SQL transport include:

Queue

Topic

Topic Subscription

Subscription Type

TypeDescription
1All
2Routing Key
3Pattern (uses Routing Key)

Queue Subscription

Message

Message Delivery