Amazon SQS Configuration
MassTransit combines Amazon SQS (Simple Queue Service) with SNS (Simple Notification Service) to provide both send and publish support.
Configuring a receive endpoint will use the message topology to create and subscribe SNS topics to SQS queues so that published messages will be delivered to the receive endpoint queue.
Minimal Example
In the example below, the Amazon SQS settings are configured.
namespace AmazonSqsConsoleListener;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;
public class Program
{
public static async Task Main(string[] args)
{
await Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host("us-east-2", h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
});
});
});
})
.Build()
.RunAsync();
}
}
Broker Topology
With SQS/SNS, which supports topics and queues, messages are sent or published to SNS Topics and then routes those messages through subscriptions to the appropriate SQS Queues.
When the bus is started, MassTransit will create SNS Topics and SQS Queues for the receive endpoint.
Configuration
The configuration includes:
- The Amazon SQS host
- Region name:
us-east-2
- Access key and secret key used to access the resources
- Region name:
Transport Options
All AWS SQS transport options can be configured using the .Host() method. The most commonly used settings can be configured via transport options.
services.AddOptions<AmazonSqsTransportOptions>()
.Configure(options =>
{
// configure options manually, but usually bind them to a configuration section
});
Property | Type | Description |
---|---|---|
Region | string | The AWS Region |
Scope | string | Will be used as a prefix for queue/topic name |
AccessKey | string | Access Key |
SecretKey | string | Access Secret |
Endpoint Configuration
some endpoint love
Additional Examples
Any topic can be subscribed to a receive endpoint, as shown below. The topic attributes can also be configured, in case the topic needs to be created.
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host("us-east-2", h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
});
cfg.ReceiveEndpoint("input-queue", e =>
{
// disable the default topic binding
e.ConfigureConsumeTopology = false;
e.Subscribe("event-topic", s =>
{
// set topic attributes
s.TopicAttributes["DisplayName"] = "Public Event Topic";
s.TopicSubscriptionAttributes["some-subscription-attribute"] = "some-attribute-value";
s.TopicTags.Add("environment", "development");
});
});
});
});
Errata
Scoping
Because there is only ever one "SQS/SNS" per AWS account it can be helpful to "Scope" your queues and topics. This will prefix all SQS queues and SNS topics with scope value.
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host("us-east-2", h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
// specify a scope for all topics
h.Scope("dev", true);
});
// additionally include the queues
cfg.ConfigureEndpoints(context, new DefaultEndpointNameFormatter("dev-", false));
});
});
Example IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SqsAccess",
"Effect": "Allow",
"Action": [
"sqs:SetQueueAttributes",
"sqs:ReceiveMessage",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue",
"sqs:DeleteQueue",
"sqs:TagQueue"
],
"Resource": "arn:aws:sqs:*:YOUR_ACCOUNT_ID:*"
},{
"Sid": "SnsAccess",
"Effect": "Allow",
"Action": [
"sns:GetTopicAttributes",
"sns:CreateTopic",
"sns:Publish",
"sns:Subscribe"
],
"Resource": "arn:aws:sns:*:YOUR_ACCOUNT_ID:*"
},{
"Sid": "SnsListAccess",
"Effect": "Allow",
"Action": [
"sns:ListTopics"
],
"Resource": "*"
}
]
}