AWS.Messaging 0.9.1
前缀已保留
dotnet add package AWS.Messaging --version 0.9.1
NuGet\Install-Package AWS.Messaging -Version 0.9.1
<PackageReference Include="AWS.Messaging" Version="0.9.1" />
paket add AWS.Messaging --version 0.9.1
#r "nuget: AWS.Messaging, 0.9.1"
// Install AWS.Messaging as a Cake Addin #addin nuget:?package=AWS.Messaging&version=0.9.1 // Install AWS.Messaging as a Cake Tool #tool nuget:?package=AWS.Messaging&version=0.9.1
AWS .NET 消息处理框架
注意: 此库处于 开发者预览 状态。它提供了对即将在《AWS .NET 消息处理框架》中出现的功能的早期访问。1.0.0 之前的所有版本可能会包含破坏性更改。
AWS .NET 消息处理框架 是一个 AWS 原生框架,简化了使用 AWS 服务(如 Amazon Simple Queue Service (SQS)、Amazon Simple Notification Service (SNS) 和 Amazon EventBridge)的 .NET 消息处理应用程序的开发。该框架减少了开发者必须编写的样板代码,使您可以在发布和消费消息时专注于业务逻辑。
- 对于发布者,该框架将消息从 .NET 对象序列化为符合 CloudEvents 的消息,并将其封装在特定于服务的 AWS 消息中。然后,它将消息发布到配置的 SQS 队列、SNS 主题或 EventBridge 事件总线。
- 对于用户,框架将消息反序列化为其.NET对象,并将其路由到相应的业务逻辑。框架在消息被处理的过程中跟踪消息的可见性(以避免重复处理消息),并在完成时将从队列中删除消息。该框架支持在同一应用程序中以长轮询过程和AWS Lambda函数方式消费消息。
项目状态
该框架目前处于 开发者预览 阶段。以下功能得到支持
- 向SQS、SNS和EventBridge发布消息
- 在长轮询过程中处理SQS消息
- 在AWS Lambda函数中处理SQS消息
- 处理来自 FIFO(先进先出)队列 的消息,并尊重分组排序
- OpenTelemetry 仪表板
- 自定义序列化
- 性能和错误强化
待添加功能
- AWS X-Ray 仪表板
入门指南
将 AWS.Messaging
NuGet 包添加到您的项目中
dotnet add package AWS.Messaging --prerelease
该框架集成到.NET的 依赖注入(DI)服务容器 中。您可以在应用程序启动时通过调用 AddAWSMessageBus
来将其添加到DI容器中。
var builder = WebApplication.CreateBuilder(args);
// Register the AWS Message Processing Framework for .NET
builder.Services.AddAWSMessageBus(builder =>
{
// Register that you'll publish messages of type ChatMessage to an existing queue
builder.AddSQSPublisher<ChatMessage>("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");
});
该框架支持在不同的应用程序中发布一个或多个消息类型,处理一个或多个消息类型,或者两者兼具。
发布消息
以下代码演示了一个将为不同的AWS服务发布不同消息类型的应用程序的配置。
var builder = WebApplication.CreateBuilder(args);
// Register the AWS Message Processing Framework for .NET
builder.Services.AddAWSMessageBus(builder =>
{
// Register that you'll publish messages of type ChatMessage to an existing queue
builder.AddSQSPublisher<ChatMessage>("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");
// Register that you'll publish messages of type OrderInfo to an existing SNS topic
builder.AddSNSPublisher<OrderInfo>("arn:aws:sns:us-west-2:012345678910:MyAppProd");
// Register that you'll publish messages of type FoodItem to an existing EventBridge bus
builder.AddEventBridgePublisher<FoodItem>("arn:aws:events:us-west-2:012345678910:event-bus/default");
});
在启动时注册框架后,将通用的 IMessagePublisher
注入到您的代码中。调用其 PublishAsync
方法来发布上述配置的任何消息类型。通用发布者将根据消息类型确定消息的发送目标。
在以下示例中,一个ASP.NET MVC控制器从用户接收 ChatMessage
消息和 OrderInfo
事件,并将其分别发布到SQS和SNS。这两种消息类型都可以使用上配置的通用发布者发布。
[ApiController]
[Route("[controller]")]
public class PublisherController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public PublisherController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpPost("chatmessage", Name = "Chat Message")]
public async Task<IActionResult> PublishChatMessage([FromBody] ChatMessage message)
{
// Perform business and validation logic on the ChatMessage here
if (message == null)
{
return BadRequest("A chat message was not submitted. Unable to forward to the message queue.");
}
if (string.IsNullOrEmpty(message.MessageDescription))
{
return BadRequest("The MessageDescription cannot be null or empty.");
}
// Publish the ChatMessage to SQS, using the generic publisher
await _messagePublisher.PublishAsync(message);
return Ok();
}
[HttpPost("order", Name = "Order")]
public async Task<IActionResult> PublishOrder([FromBody] OrderInfo message)
{
if (message == null)
{
return BadRequest("An order was not submitted.");
}
// Publish the OrderInfo to SNS, using the generic publisher
await _messagePublisher.PublishAsync(message);
return Ok();
}
}
特定于服务的发布者
上面的示例使用了通用的 IMessagePublisher
,它可以基于配置的消息类型发布到任何支持AWS服务。该框架还提供了SQS、SNS和EventBridge的特定发布者。这些特定发布者公开了仅适用于该服务的选项,可以通过 ISQSPublisher
、ISNSPublisher
和 IEventBridgePublisher
类型进行注入。
例如,当向SQS FIFO队列发布消息时,必须设置适当的 消息分组ID。以下代码再次演示了 ChatMessage
示例,但这次使用了 ISQSPublisher
来设置SQS特定选项。
public class PublisherController : ControllerBase
{
private readonly ISQSPublisher _sqsPublisher;
public PublisherController(ISQSPublisher sqsPublisher)
{
_sqsPublisher = sqsPublisher;
}
[HttpPost("chatmessage", Name = "Chat Message")]
public async Task<IActionResult> PublishChatMessage([FromBody] ChatMessage message)
{
// Perform business and validation logic on the ChatMessage here
if (message == null)
{
return BadRequest("A chat message was not submitted. Unable to forward to the message queue.");
}
if (string.IsNullOrEmpty(message.MessageDescription))
{
return BadRequest("The MessageDescription cannot be null or empty.");
}
// Send the ChatMessage to SQS using the injected ISQSPublisher, with SQS-specific options
await _sqsPublisher.SendAsync(message, new SQSOptions
{
DelaySeconds = <delay-in-seconds>,
MessageAttributes = <message-attributes>,
MessageDeduplicationId = <message-deduplication-id>,
MessageGroupId = <message-group-id>
});
return Ok();
}
}
对于SNS和EventBridge,可以使用相应的 ISNSPublisher
和 IEventBridgePublisher
以相同的方式完成。
await _snsPublisher.PublishAsync(message, new SNSOptions
{
Subject = <subject>,
MessageAttributes = <message-attributes>,
MessageDeduplicationId = <message-deduplication-id>,
MessageGroupId = <message-group-id>
});
await _eventBridgePublisher.PublishAsync(message, new EventBridgeOptions
{
DetailType = <detail-type>,
Resources = <resources>,
Source = <source>,
Time = <time>,
TraceHeader = <trace-header>
});
消费消息
要消费消息,需要为要处理的每个消息类型实现一个消息处理器,使用 IMessageHandler
接口。消息类型与消息处理器之间的映射在项目启动时进行配置。
await Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register an SQS Queue that the framework will poll for messages
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");
// Register all IMessageHandler implementations with the message type they should process.
// Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
})
.Build()
.RunAsync();
以下代码显示了一个用于 ChatMessage
消息的示例消息处理器。
public class ChatMessageHandler : IMessageHandler<ChatMessage>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default)
{
// Add business and validation logic here
if (messageEnvelope == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}
if (messageEnvelope.Message == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}
ChatMessage message = messageEnvelope.Message;
Console.WriteLine($"Message Description: {message.MessageDescription}");
// Return success so the framework will delete the message from the queue
return Task.FromResult(MessageProcessStatus.Success());
}
}
外部的 MessageEnvelope
包含框架使用的元数据。它的 message
属性是消息类型(在这种情况下为 ChatMessage
)。
可以通过返回 MessageProcessStatus.Success()
来指示已成功处理消息,并且框架将从SQS队列中删除该消息。当返回 MessageProcessStatus.Failed()
时,消息将保留在队列中,可以再次处理,或者根据配置将其移动到 死信队列
在长期运行过程中处理消息
您可以使用带有 SQS 队列 URL 的 AddSQSPoller
来启动一个长运行的 BackgroundService
,该服务将连续轮询队列并处理消息。
await Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register an SQS Queue that the framework will poll for messages
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options =>
{
// The maximum number of messages from this queue that the framework will process concurrently on this client
options.MaxNumberOfConcurrentMessages = 10;
// The duration each call to SQS will wait for new messages
options.WaitTimeSeconds = 20;
});
// Register all IMessageHandler implementations with the message type they should process
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
})
.Build()
.RunAsync();
配置 SQS 消息轮询器
当调用 AddSQSPoller
时,可以使用 SQSMessagePollerOptions
配置 SQS 消息轮询器。
MaxNumberOfConcurrentMessages
- 从队列中同时处理的最大消息数量。默认值为10
。WaitTimeSeconds
-ReceiveMessage
SQS 调用等待消息到达队列并返回的时间长度(以秒为单位)。如果消息可用,则调用会在少于WaitTimeSeconds
的情况下返回。默认值是20
。
处理消息可见性超时
当消费者开始处理特定消息时,SQS 消息会有一个 可见性超时 期间。它将保持队列中但隐藏在其他消费者后面,以避免多次处理。如果消息在再次可见之前未处理和删除,则另一个消费者可能尝试处理同一消息。
框架将跟踪并尝试扩展其正在处理的 messages 的可见性超时。您可以在调用 AddSQSPoller
时,在 SQSMessagePollerOptions
上配置此行为。
VisibilityTimeout
- 已接收消息在后续检索请求中隐藏的时间长度(以秒为单位)。默认值是30
。VisibilityTimeoutExtensionThreshold
- 当消息的可见性超时在其许多秒内到期时,框架将延长可见性超时(再延长VisibilityTimeout
秒)。默认值是5
。VisibilityTimeoutExtensionHeartbeatInterval
- 框架在多少秒内检查处于VisibilityTimeoutExtensionThreshold
秒内的消息,然后延长其可见性超时。默认值是1
。
以下示例中,框架将每 1 秒检查一次仍在处理的消息。对于那些在 5 秒内再次可见的消息,框架将自动将每个消息的可见性超时再延长 30 秒。
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options =>
{
options.VisibilityTimeout = 30;
options.VisibilityTimeoutExtensionThreshold = 5;
VisibilityTimeoutExtensionHeartbeatInterval = 1;
});
处理 AWS Lambda 函数中的消息
您可以使用与 Lambda 集成的 SQS。这是由 AWS.Messaging.Lambda
包提供的。请参阅其 README 以开始。
SQS 轮询器弹性
设计上 SQS 轮询器是弹性的,能够以两种方式处理底层 .NET SDK 以及框架本身抛出的错误。我们已经将由于用户端配置无效而可能发生的许多异常分类为致命异常。这些异常将在抛出友好的错误消息后停止 SQS 轮询器后台服务的运行。但是,除了我们已经定义的致命异常之外,任何抛出的异常都不会导致 SQS 轮询器出错,并且将在底层服务遇到性能下降或中断的情况下保持弹性。SQS 轮询器通过应用一段时间延迟来重试失败的 SQS 请求。
框架定义了两个接口,IBackoffHandler
和 IBackoffPolicy
。IBackoffPolicy
与实现 IBackoffHandler
的 BackoffHandler
密切相关。默认的 IBackoffHandler
实现会检查附加的 IBackoffPolicy
来确定是否应该应用回退。如果应该应用回退,IBackoffPolicy
也会返回重试间的时间延迟。
框架支持三种回退策略
None
,这会禁用回退处理器。这将使用户能够完全依赖 SDK 的重试逻辑。Interval
,会在给定的和可配置的间隔内进行退避。默认值是1秒。CappedExponential
,会进行指数退避,直到达到一定的可配置最大退避时间,此时会切换到间隔退避。默认的最大退避时间界限是1小时。
默认情况下,如果没有进行用户配置,SQS Poller将使用IBackoffHandler
接口的默认实现,并与具有上限的指数退避策略结合使用。
用户可以按以下方式更改退避策略
services.AddAWSMessageBus(builder =>
{
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/536721586275/MPF");
// Optional: Configure the backoff policy used by the SQS Poller.
builder.ConfigureBackoffPolicy(options =>
{
// Use 1 of the available 3 backoff policies:
// No backoff Policy
options.UseNoBackoff();
// Interval backoff policy
options.UseIntervalBackoff(x =>
{
x.FixedInterval = 1;
});
// Capped exponential backoff policy
options.UseCappedExponentialBackoff(x =>
{
x.CapBackoffTime = 60;
});
});
});
用户还可以通过实现IBackoffHandler
接口并在此将AWS消息总线注入到DI容器之前来实现自己的退避处理器。以下是实现方式
services.TryAddSingleton<IBackoffHandler, CustomBackoffHandler>();
services.AddAWSMessageBus(builder =>
{
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MPF");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");
});
作为一个例子,您可以使用Polly来处理重试。一个实现此功能的示例应用程序可以在这里找到。
遥测
.NET的AWS消息处理框架已针对OpenTelemetry进行配置,以记录每个由框架发布的或处理的消息的跟踪。这由AWS.Messaging.Telemetry.OpenTelemetry
包提供。请参阅其README以开始使用。
定制
框架在三个不同的“层”中构建、发送和处理消息
- 在最外层,框架构建AWS原生请求或响应,这些请求或响应针对特定服务。例如,与SQS一起,构建
SendMessage
请求,并处理由服务定义的Message
对象。 - 在SQS请求和响应内部,将
MessageBody
元素(对于SNS为Message
,对于EventBridge为Detail
)设置为JSON格式的CloudEvent。此内容包含框架设置的元数据,当处理消息时可以在MessageEnvelope
对象上访问。 - 在内层,CloudEvent JSON对象内的
data
属性包含消息发送或接收时的.NET对象的JSON序列化。
{
"id":"b02f156b-0f02-48cf-ae54-4fbbe05cffba",
"source":"/aws/messaging",
"specversion":"1.0",
"type":"Publisher.Models.ChatMessage",
"time":"2023-11-21T16:36:02.8957126+00:00",
"data":"<the ChatMessage object serialized as JSON>"
}
您可以根据需要自定义消息信封的配置和读取方式
"id"
唯一标识消息。默认情况下,它被设置为一个新的GUID,但这可以通过实现自己的IMessageIdGenerator
并将其注入到DI容器中来自定义。"type"
控制消息如何路由到处理器。默认情况下,这使用与消息对应的.NET类型的全名。您可以通过在将消息类型映射到目的地时使用messageTypeIdentifier
参数来覆盖此功能,通过AddSQSPublisher
、AddSNSPublisher
或AddEventBridgePublisher
。"source"
指示哪个系统或服务器发送了消息。- 如果从AWS Lambda发布,这将是指定的函数名,如果在Amazon ECS上,则是指定的集群名称和任务ARN,如果在Amazon EC2上,则是指定的实例ID,否则将使用回退值
/aws/messaging
。 - 您可以通过在
MessageBusBuilder
上的AddMessageSource
或AddMessageSourceSuffix
中覆盖此功能。
- 如果从AWS Lambda发布,这将是指定的函数名,如果在Amazon ECS上,则是指定的集群名称和任务ARN,如果在Amazon EC2上,则是指定的实例ID,否则将使用回退值
"time"
设置为UTC中的当前DateTime。您可以实现自己的IDateTimeHandler
并将其注入到DI容器中来自定义。"data"
包含发送或接收的消息的.NET对象的JSON表示在
MessageBusBuilder
上配置ConfigureSerializationOptions
允许您配置用于序列化和反序列化消息的System.Text.Json.JsonSerializerOptions
。- 在框架构建完消息后,要注入额外的属性或转换消息封套,您可以实现
ISerializationCallback
,并通过在MessageBusBuilder
上调用AddSerializationCallback
进行注册。
获取帮助
有关对该框架的功能请求或问题,请在此存储库中打开 问题。
贡献
我们欢迎社区贡献和拉取请求。有关如何提交代码的信息,请参阅CONTRIBUTING.md。
安全
.NET版本的AWS消息处理框架依赖于AWS SDK for .NET与AWS通信。有关更多信息,请参阅安全部分,位于AWS SDK for .NET 开发者指南中。您也可以在AWS Message Processing Framework for .NET 开发者指南中找到更多信息。
出于安全考虑,该框架不会记录用户发送的数据消息。如果用户想为了调试目的启用此功能,您需要在AWS消息总线中调用以下方法EnableMessageContentLogging()
builder.Services.AddAWSMessageBus(bus =>
{
builder.EnableMessageContentLogging();
});
如果发现潜在的安全问题,请参考安全策略以报告信息。
其他资源
- AWS Message Processing Framework for .NET 设计文档
- 示例应用程序 - 包含发布者服务、长期运行订阅者服务、Lambda函数处理程序和用于覆盖框架内置退避逻辑的Polly的示例应用程序。
- 开发者指南
- API参考
- 介绍 AWS Message Processing Framework for .NET(预览)博客文章 - 演示如何创建简单应用程序来发送和接收SQS消息。
许可证
本项目使用Apache-2.0许可证。
产品 | 版本 兼容和额外的计算目标框架版本。 |
---|---|
.NET | net6.0兼容。 net6.0-android已计算。 net6.0-ios已计算。 net6.0-maccatalyst已计算。 net6.0-macos已计算。 net6.0-tvos已计算。 net6.0-windows已计算。 net7.0已计算。 net7.0-android已计算。 net7.0-ios已计算。 net7.0-maccatalyst已计算。 net7.0-macos已计算。 net7.0-tvos已计算。 net7.0-windows已计算。 net8.0已计算。 net8.0-android已计算。 net8.0-browser已计算。 net8.0-ios已计算。 net8.0-maccatalyst已计算。 net8.0-macos已计算。 net8.0-tvos已计算。 net8.0-windows已计算。 |
-
net6.0
- AWSSDK.EventBridge (>= 3.7.302.40)
- AWSSDK.Extensions.NETCore.Setup (>= 3.7.300)
- AWSSDK.SimpleNotificationService (>= 3.7.301.22)
- AWSSDK.SQS (>= 3.7.300.74)
- Microsoft.Extensions.DependencyInjection (>= 6.0.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 6.0.0)
- Microsoft.Extensions.Http (>= 6.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 6.0.4)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 6.0.0)
NuGet包 (2)
显示依赖AWS.Messaging的Top 2个NuGet包
包 | 下载 |
---|---|
AWS.Messaging.Lambda 此包是AWS消息处理框架的.NET插件,允许.NET Lambda函数处理框架发布的消息。 |
|
AWS.Messaging.Telemetry.OpenTelemetry 这是一个用于收集发送和接收的消息跟踪的软件包,该软件包为.NET消息处理框架提供仪器库。 |
GitHub 仓库
此软件包未被任何流行的GitHub仓库使用。
版本 | 下载 | 最后更新 |
---|---|---|
0.9.1 | 5,678 | 4/22/2024 |
0.9.0 | 4,130 | 3/26/2024 |
0.3.0-beta | 103 | 3/20/2024 |
0.2.0-beta | 131 | 3/8/2024 |
0.1.0-beta | 360 | 12/8/2023 |