AWS.Messaging 0.9.1

前缀已保留
dotnet add package AWS.Messaging --version 0.9.1                
NuGet\Install-Package AWS.Messaging -Version 0.9.1                
该命令旨在在 Visual Studio 的包管理器控制台中使用,因为它使用 NuGet 模块的 Install-Package 版本。
<PackageReference Include="AWS.Messaging" Version="0.9.1" />                
对于支持 PackageReference 的项目,将此 XML 节点复制到项目文件中以引用软件包。
paket add AWS.Messaging --version 0.9.1                
#r "nuget: AWS.Messaging, 0.9.1"                
#r 指令可用于 F# Interactive 和 Polyglot Notebooks。将其复制到交互式工具或脚本源代码中以引用软件包。
// Install AWS.Messaging as a Cake Addin
#addin nuget:?package=AWS.Messaging&version=0.9.1

// Install AWS.Messaging as a Cake Tool
#tool nuget:?package=AWS.Messaging&version=0.9.1                

AWS .NET 消息处理框架

nuget downloads build status

注意: 此库处于 开发者预览 状态。它提供了对即将在《AWS .NET 消息处理框架》中出现的功能的早期访问。1.0.0 之前的所有版本可能会包含破坏性更改。

AWS .NET 消息处理框架 是一个 AWS 原生框架,简化了使用 AWS 服务(如 Amazon Simple Queue Service (SQS)、Amazon Simple Notification Service (SNS) 和 Amazon EventBridge)的 .NET 消息处理应用程序的开发。该框架减少了开发者必须编写的样板代码,使您可以在发布和消费消息时专注于业务逻辑。

  • 对于发布者,该框架将消息从 .NET 对象序列化为符合 CloudEvents 的消息,并将其封装在特定于服务的 AWS 消息中。然后,它将消息发布到配置的 SQS 队列、SNS 主题或 EventBridge 事件总线。
  • 对于用户,框架将消息反序列化为其.NET对象,并将其路由到相应的业务逻辑。框架在消息被处理的过程中跟踪消息的可见性(以避免重复处理消息),并在完成时将从队列中删除消息。该框架支持在同一应用程序中以长轮询过程和AWS Lambda函数方式消费消息。

项目状态

该框架目前处于 开发者预览 阶段。以下功能得到支持

  • 向SQS、SNS和EventBridge发布消息
  • 在长轮询过程中处理SQS消息
  • 在AWS Lambda函数中处理SQS消息
  • 处理来自 FIFO(先进先出)队列 的消息,并尊重分组排序
  • OpenTelemetry 仪表板
  • 自定义序列化
  • 性能和错误强化

待添加功能

  • AWS X-Ray 仪表板

入门指南

AWS.Messaging NuGet 包添加到您的项目中

dotnet add package AWS.Messaging --prerelease

该框架集成到.NET的 依赖注入(DI)服务容器 中。您可以在应用程序启动时通过调用 AddAWSMessageBus 来将其添加到DI容器中。

var builder = WebApplication.CreateBuilder(args);

// Register the AWS Message Processing Framework for .NET
builder.Services.AddAWSMessageBus(builder =>
{
    // Register that you'll publish messages of type ChatMessage to an existing queue
    builder.AddSQSPublisher<ChatMessage>("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");
});

该框架支持在不同的应用程序中发布一个或多个消息类型,处理一个或多个消息类型,或者两者兼具。

发布消息

以下代码演示了一个将为不同的AWS服务发布不同消息类型的应用程序的配置。

var builder = WebApplication.CreateBuilder(args);

// Register the AWS Message Processing Framework for .NET
builder.Services.AddAWSMessageBus(builder =>
{
    // Register that you'll publish messages of type ChatMessage to an existing queue
    builder.AddSQSPublisher<ChatMessage>("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");

    // Register that you'll publish messages of type OrderInfo to an existing SNS topic
    builder.AddSNSPublisher<OrderInfo>("arn:aws:sns:us-west-2:012345678910:MyAppProd");

    // Register that you'll publish messages of type FoodItem to an existing EventBridge bus
    builder.AddEventBridgePublisher<FoodItem>("arn:aws:events:us-west-2:012345678910:event-bus/default");
});

在启动时注册框架后,将通用的 IMessagePublisher 注入到您的代码中。调用其 PublishAsync 方法来发布上述配置的任何消息类型。通用发布者将根据消息类型确定消息的发送目标。

在以下示例中,一个ASP.NET MVC控制器从用户接收 ChatMessage 消息和 OrderInfo事件,并将其分别发布到SQS和SNS。这两种消息类型都可以使用上配置的通用发布者发布。

[ApiController]
[Route("[controller]")]
public class PublisherController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public PublisherController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpPost("chatmessage", Name = "Chat Message")]
    public async Task<IActionResult> PublishChatMessage([FromBody] ChatMessage message)
    {
        // Perform business and validation logic on the ChatMessage here
        if (message == null)
        {
            return BadRequest("A chat message was not submitted. Unable to forward to the message queue.");
        }
        if (string.IsNullOrEmpty(message.MessageDescription))
        {
            return BadRequest("The MessageDescription cannot be null or empty.");
        }

        // Publish the ChatMessage to SQS, using the generic publisher
        await _messagePublisher.PublishAsync(message);

        return Ok();
    }

    [HttpPost("order", Name = "Order")]
    public async Task<IActionResult> PublishOrder([FromBody] OrderInfo message)
    {
        if (message == null)
        {
            return BadRequest("An order was not submitted.");
        }

        // Publish the OrderInfo to SNS, using the generic publisher
        await _messagePublisher.PublishAsync(message);

        return Ok();
    }
}

特定于服务的发布者

上面的示例使用了通用的 IMessagePublisher,它可以基于配置的消息类型发布到任何支持AWS服务。该框架还提供了SQS、SNS和EventBridge的特定发布者。这些特定发布者公开了仅适用于该服务的选项,可以通过 ISQSPublisherISNSPublisherIEventBridgePublisher 类型进行注入。

例如,当向SQS FIFO队列发布消息时,必须设置适当的 消息分组ID。以下代码再次演示了 ChatMessage 示例,但这次使用了 ISQSPublisher 来设置SQS特定选项。

public class PublisherController : ControllerBase
{
    private readonly ISQSPublisher _sqsPublisher;

    public PublisherController(ISQSPublisher sqsPublisher)
    {
        _sqsPublisher = sqsPublisher;
    }

    [HttpPost("chatmessage", Name = "Chat Message")]
    public async Task<IActionResult> PublishChatMessage([FromBody] ChatMessage message)
    {
        // Perform business and validation logic on the ChatMessage here
        if (message == null)
        {
            return BadRequest("A chat message was not submitted. Unable to forward to the message queue.");
        }
        if (string.IsNullOrEmpty(message.MessageDescription))
        {
            return BadRequest("The MessageDescription cannot be null or empty.");
        }

        // Send the ChatMessage to SQS using the injected ISQSPublisher, with SQS-specific options
        await _sqsPublisher.SendAsync(message, new SQSOptions
        {
            DelaySeconds = <delay-in-seconds>,
            MessageAttributes = <message-attributes>,
            MessageDeduplicationId = <message-deduplication-id>,
            MessageGroupId = <message-group-id>
        });

        return Ok();
    }
}

对于SNS和EventBridge,可以使用相应的 ISNSPublisherIEventBridgePublisher 以相同的方式完成。

await _snsPublisher.PublishAsync(message, new SNSOptions
{
    Subject = <subject>,
    MessageAttributes = <message-attributes>,
    MessageDeduplicationId = <message-deduplication-id>,
    MessageGroupId = <message-group-id>
});
await _eventBridgePublisher.PublishAsync(message, new EventBridgeOptions
{
    DetailType = <detail-type>,
    Resources = <resources>,
    Source = <source>,
    Time = <time>,
    TraceHeader = <trace-header>
});

消费消息

要消费消息,需要为要处理的每个消息类型实现一个消息处理器,使用 IMessageHandler 接口。消息类型与消息处理器之间的映射在项目启动时进行配置。

await Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        // Register the AWS Message Processing Framework for .NET
        services.AddAWSMessageBus(builder =>
        {
            // Register an SQS Queue that the framework will poll for messages
            builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");

            // Register all IMessageHandler implementations with the message type they should process. 
            // Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler
            builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
        });
    })
    .Build()
    .RunAsync();

以下代码显示了一个用于 ChatMessage 消息的示例消息处理器。

public class ChatMessageHandler : IMessageHandler<ChatMessage>
{
    public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default)
    {
        // Add business and validation logic here
        if (messageEnvelope == null)
        {
            return Task.FromResult(MessageProcessStatus.Failed());
        }

        if (messageEnvelope.Message == null)
        {
            return Task.FromResult(MessageProcessStatus.Failed());
        }

        ChatMessage message = messageEnvelope.Message;

        Console.WriteLine($"Message Description: {message.MessageDescription}");

        // Return success so the framework will delete the message from the queue
        return Task.FromResult(MessageProcessStatus.Success());
    }
}

外部的 MessageEnvelope 包含框架使用的元数据。它的 message 属性是消息类型(在这种情况下为 ChatMessage)。

可以通过返回 MessageProcessStatus.Success() 来指示已成功处理消息,并且框架将从SQS队列中删除该消息。当返回 MessageProcessStatus.Failed() 时,消息将保留在队列中,可以再次处理,或者根据配置将其移动到 死信队列

在长期运行过程中处理消息

您可以使用带有 SQS 队列 URL 的 AddSQSPoller 来启动一个长运行的 BackgroundService,该服务将连续轮询队列并处理消息。

await Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        // Register the AWS Message Processing Framework for .NET
        services.AddAWSMessageBus(builder =>
        {
            // Register an SQS Queue that the framework will poll for messages
            builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => 
            {
                // The maximum number of messages from this queue that the framework will process concurrently on this client
                options.MaxNumberOfConcurrentMessages = 10;

                // The duration each call to SQS will wait for new messages
                options.WaitTimeSeconds = 20; 
            });

            // Register all IMessageHandler implementations with the message type they should process
            builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
        });
    })
    .Build()
    .RunAsync();

配置 SQS 消息轮询器

当调用 AddSQSPoller 时,可以使用 SQSMessagePollerOptions 配置 SQS 消息轮询器。

  • MaxNumberOfConcurrentMessages - 从队列中同时处理的最大消息数量。默认值为 10
  • WaitTimeSeconds - ReceiveMessage SQS 调用等待消息到达队列并返回的时间长度(以秒为单位)。如果消息可用,则调用会在少于 WaitTimeSeconds 的情况下返回。默认值是 20
处理消息可见性超时

当消费者开始处理特定消息时,SQS 消息会有一个 可见性超时 期间。它将保持队列中但隐藏在其他消费者后面,以避免多次处理。如果消息在再次可见之前未处理和删除,则另一个消费者可能尝试处理同一消息。

框架将跟踪并尝试扩展其正在处理的 messages 的可见性超时。您可以在调用 AddSQSPoller 时,在 SQSMessagePollerOptions 上配置此行为。

  • VisibilityTimeout - 已接收消息在后续检索请求中隐藏的时间长度(以秒为单位)。默认值是 30
  • VisibilityTimeoutExtensionThreshold - 当消息的可见性超时在其许多秒内到期时,框架将延长可见性超时(再延长 VisibilityTimeout 秒)。默认值是 5
  • VisibilityTimeoutExtensionHeartbeatInterval- 框架在多少秒内检查处于 VisibilityTimeoutExtensionThreshold 秒内的消息,然后延长其可见性超时。默认值是 1

以下示例中,框架将每 1 秒检查一次仍在处理的消息。对于那些在 5 秒内再次可见的消息,框架将自动将每个消息的可见性超时再延长 30 秒。

 builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => 
{
    options.VisibilityTimeout = 30;
    options.VisibilityTimeoutExtensionThreshold = 5;
    VisibilityTimeoutExtensionHeartbeatInterval = 1;
});

处理 AWS Lambda 函数中的消息

您可以使用与 Lambda 集成的 SQS。这是由 AWS.Messaging.Lambda 包提供的。请参阅其 README 以开始。

SQS 轮询器弹性

设计上 SQS 轮询器是弹性的,能够以两种方式处理底层 .NET SDK 以及框架本身抛出的错误。我们已经将由于用户端配置无效而可能发生的许多异常分类为致命异常。这些异常将在抛出友好的错误消息后停止 SQS 轮询器后台服务的运行。但是,除了我们已经定义的致命异常之外,任何抛出的异常都不会导致 SQS 轮询器出错,并且将在底层服务遇到性能下降或中断的情况下保持弹性。SQS 轮询器通过应用一段时间延迟来重试失败的 SQS 请求。

框架定义了两个接口,IBackoffHandlerIBackoffPolicyIBackoffPolicy 与实现 IBackoffHandlerBackoffHandler 密切相关。默认的 IBackoffHandler 实现会检查附加的 IBackoffPolicy 来确定是否应该应用回退。如果应该应用回退,IBackoffPolicy 也会返回重试间的时间延迟。

框架支持三种回退策略

  • None,这会禁用回退处理器。这将使用户能够完全依赖 SDK 的重试逻辑。
  • Interval,会在给定的和可配置的间隔内进行退避。默认值是1秒。
  • CappedExponential,会进行指数退避,直到达到一定的可配置最大退避时间,此时会切换到间隔退避。默认的最大退避时间界限是1小时。

默认情况下,如果没有进行用户配置,SQS Poller将使用IBackoffHandler接口的默认实现,并与具有上限的指数退避策略结合使用。

用户可以按以下方式更改退避策略

services.AddAWSMessageBus(builder =>
{
    builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/536721586275/MPF");

    // Optional: Configure the backoff policy used by the SQS Poller.
    builder.ConfigureBackoffPolicy(options =>
    {
        // Use 1 of the available 3 backoff policies:

        // No backoff Policy
        options.UseNoBackoff();

        // Interval backoff policy
        options.UseIntervalBackoff(x =>
        {
            x.FixedInterval = 1;
        });

        // Capped exponential backoff policy
        options.UseCappedExponentialBackoff(x =>
        {
            x.CapBackoffTime = 60;
        });
    });
});

用户还可以通过实现IBackoffHandler接口并在此将AWS消息总线注入到DI容器之前来实现自己的退避处理器。以下是实现方式

services.TryAddSingleton<IBackoffHandler, CustomBackoffHandler>();

services.AddAWSMessageBus(builder =>
{
    builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MPF");
    builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");
});

作为一个例子,您可以使用Polly来处理重试。一个实现此功能的示例应用程序可以在这里找到。

遥测

.NET的AWS消息处理框架已针对OpenTelemetry进行配置,以记录每个由框架发布的或处理的消息的跟踪。这由AWS.Messaging.Telemetry.OpenTelemetry包提供。请参阅其README以开始使用。

定制

框架在三个不同的“层”中构建、发送和处理消息

  1. 在最外层,框架构建AWS原生请求或响应,这些请求或响应针对特定服务。例如,与SQS一起,构建SendMessage请求,并处理由服务定义的Message对象。
  2. 在SQS请求和响应内部,将MessageBody元素(对于SNS为Message,对于EventBridge为Detail)设置为JSON格式的CloudEvent。此内容包含框架设置的元数据,当处理消息时可以在MessageEnvelope对象上访问。
  3. 在内层,CloudEvent JSON对象内的data属性包含消息发送或接收时的.NET对象的JSON序列化。
{
    "id":"b02f156b-0f02-48cf-ae54-4fbbe05cffba",
    "source":"/aws/messaging",
    "specversion":"1.0",
    "type":"Publisher.Models.ChatMessage",
    "time":"2023-11-21T16:36:02.8957126+00:00",
    "data":"<the ChatMessage object serialized as JSON>"
}

您可以根据需要自定义消息信封的配置和读取方式

  • "id"唯一标识消息。默认情况下,它被设置为一个新的GUID,但这可以通过实现自己的IMessageIdGenerator并将其注入到DI容器中来自定义。
  • "type"控制消息如何路由到处理器。默认情况下,这使用与消息对应的.NET类型的全名。您可以通过在将消息类型映射到目的地时使用messageTypeIdentifier参数来覆盖此功能,通过AddSQSPublisherAddSNSPublisherAddEventBridgePublisher
  • "source"指示哪个系统或服务器发送了消息。
    • 如果从AWS Lambda发布,这将是指定的函数名,如果在Amazon ECS上,则是指定的集群名称和任务ARN,如果在Amazon EC2上,则是指定的实例ID,否则将使用回退值/aws/messaging
    • 您可以通过在MessageBusBuilder上的AddMessageSourceAddMessageSourceSuffix中覆盖此功能。
  • "time"设置为UTC中的当前DateTime。您可以实现自己的IDateTimeHandler并将其注入到DI容器中来自定义。
  • "data"包含发送或接收的消息的.NET对象的JSON表示
    • MessageBusBuilder上配置ConfigureSerializationOptions允许您配置用于序列化和反序列化消息的 System.Text.Json.JsonSerializerOptions
    • 在框架构建完消息后,要注入额外的属性或转换消息封套,您可以实现ISerializationCallback,并通过在MessageBusBuilder上调用AddSerializationCallback进行注册。

获取帮助

有关对该框架的功能请求或问题,请在此存储库中打开 问题

贡献

我们欢迎社区贡献和拉取请求。有关如何提交代码的信息,请参阅CONTRIBUTING.md

安全

.NET版本的AWS消息处理框架依赖于AWS SDK for .NET与AWS通信。有关更多信息,请参阅安全部分,位于AWS SDK for .NET 开发者指南中。您也可以在AWS Message Processing Framework for .NET 开发者指南中找到更多信息。

出于安全考虑,该框架不会记录用户发送的数据消息。如果用户想为了调试目的启用此功能,您需要在AWS消息总线中调用以下方法EnableMessageContentLogging()

builder.Services.AddAWSMessageBus(bus =>
{
    builder.EnableMessageContentLogging();
});

如果发现潜在的安全问题,请参考安全策略以报告信息。

其他资源

许可证

本项目使用Apache-2.0许可证。

产品 兼容和额外的计算目标框架版本。
.NET net6.0兼容。 net6.0-android已计算。 net6.0-ios已计算。 net6.0-maccatalyst已计算。 net6.0-macos已计算。 net6.0-tvos已计算。 net6.0-windows已计算。 net7.0已计算。 net7.0-android已计算。 net7.0-ios已计算。 net7.0-maccatalyst已计算。 net7.0-macos已计算。 net7.0-tvos已计算。 net7.0-windows已计算。 net8.0已计算。 net8.0-android已计算。 net8.0-browser已计算。 net8.0-ios已计算。 net8.0-maccatalyst已计算。 net8.0-macos已计算。 net8.0-tvos已计算。 net8.0-windows已计算。
兼容的目标框架
包含的目标框架(在包内)
了解更多关于目标框架.NET标准的信息。

NuGet包 (2)

显示依赖AWS.Messaging的Top 2个NuGet包

下载
AWS.Messaging.Lambda

此包是AWS消息处理框架的.NET插件,允许.NET Lambda函数处理框架发布的消息。

AWS.Messaging.Telemetry.OpenTelemetry

这是一个用于收集发送和接收的消息跟踪的软件包,该软件包为.NET消息处理框架提供仪器库。

GitHub 仓库

此软件包未被任何流行的GitHub仓库使用。

版本 下载 最后更新
0.9.1 5,678 4/22/2024
0.9.0 4,130 3/26/2024
0.3.0-beta 103 3/20/2024
0.2.0-beta 131 3/8/2024
0.1.0-beta 360 12/8/2023