Picton.Messaging 9.1.0

dotnet add package Picton.Messaging --version 9.1.0                
NuGet\Install-Package Picton.Messaging -Version 9.1.0                
此命令旨在在 Visual Studio 的包管理器控制台中使用,因为它们使用 NuGet 模块的版本 Install-Package
<PackageReference Include="Picton.Messaging" Version="9.1.0" />                
对于支持 PackageReference 的项目,将此 XML 节复制到项目文件中,以引用该包。
paket add Picton.Messaging --version 9.1.0                
#r "nuget: Picton.Messaging, 9.1.0"                
#r 指令可用于 F# Interactive 和 Polyglot Notebooks。将此复制到交互式工具或脚本源代码中,以引用包。
// Install Picton.Messaging as a Cake Addin
#addin nuget:?package=Picton.Messaging&version=9.1.0

// Install Picton.Messaging as a Cake Tool
#tool nuget:?package=Picton.Messaging&version=9.1.0                

Picton

License Build status Coverage Status FOSSA Status

关于

Picton.Messaging 是一个 C# 库,其中包含一个高性能的消息处理器(也称为消息“泵”),旨在尽可能高效地处理来自 Azure 存储队列的消息。

我创建了 Picton.Mesaging,因为我需要一个方法来以尽可能快和高效的方式处理从 Azure 存储队列的大量消息。我搜索了很长时间,但从未找到满足我所有要求的解决方案。

2016年3月,我参加了Daniel Marbach举办的三个关于“异步/Await与任务并行库”的Webinar(《Daniel Marbach关于异步/Await与任务并行库的Webinar》(第1部分:https://github.com/danielmarbach/02-25-2016-AsyncWebinar)、第2部分:https://github.com/danielmarbach/03-03-2016-AsyncWebinar和第3部分:https://github.com/danielmarbach/03-10-2016-AsyncWebinar))。第2部分对我来说特别有趣,因为Daniel提出了一个通用的消息泵,它满足了我大部分(但不是全部)的需求。具体来说,Daniel的消息泵符合以下标准:

  • 并发消息处理。这意味着可以同时处理多个消息。这对于消息处理逻辑是I/O密集型的情况尤为重要。
  • 限制并发。这意味着我们可以决定同时可以处理的最大消息数。
  • 取消和优雅关闭。这意味着我们可以通知消息泵我们希望停止处理更多的消息,并且我们还可以决定在我们决定停止时对正在处理的消息采取什么行动。

Daniel在Webinar中分享的示例代码非常通用,并不特定于Azure,所以我做出了以下增强:

  • 从Azure存储队列中获取消息。我们可以指定哪个队列,甚至可以是存储模拟器中的队列。
  • 延迟。当在队列中没有找到消息时,缩小规模并减少对队列的查询是非常重要的。一旦队列中有新的消息,我们希望尽可能快地缩放并处理这些消息。为了实现这一逻辑,我做了以下两项改进:
    • 首先,当队列中没有找到消息时引入一个暂停,以减少对存储队列的查询次数。默认情况下,我们会暂停一秒钟,但这可以配置。你甚至可以完全消除暂停,但我不建议这样做。
    • 其次,减少并发消息处理任务的数目。这是Picton库(我个人认为)引入的最重要改进!Daniel的示例代码使用SemaphoreSlim作为“看门人”并限制可以同时执行的任务的数目。然而,由信号量允许的“槽位”数量必须预先确定,且是固定的。Picton消除了这个限制,并允许这个数字根据队列中消息的存在或不存在动态增加和减少。

2017年12月,发布了第2.0版,该版本引入了一种从Azure队列获取消息的更有效的方法:现在有一个单独的任务来完成这个任务,而不是允许每个并发任务自己获取消息。这意味着不再需要逻辑来增加/减少SemaphoreSlim中可用槽位的数量,并且已经移除。

2024年1月,发布了第9.0版,增加了两个主要新功能:消息泵现在能够监控多个队列,并且添加了一个专门的消息泵版本以监控遵循特定名称约定的队列。此外,这个专门的消息泵会定期查询Azure存储以检测是否创建了新的队列。在我看来,这是当你有一个有多个租户的解决方案且每个租户有一个队列时的理想解决方案。

Nuget

Picton.Messaging作为一个Nuget包可用。

NuGet Version

安装

将Picton.Messaging包含到你的C#项目中最简单的方法是抓取Nuget包。

PM> Install-Package Picton.Messaging

如何使用

一旦你在项目中正确引用了Picton Messaging库,你需要以下两个CSharp文件:

Program.cs

using WorkerService1;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();

var host = builder.Build();
host.Run();

Worker.cs

using Picton.Messaging;

namespace WorkerService1
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }
        
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var connectionString = "<--  connection string for your Azure storage account -->";
            var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time

            // Configure the message pump
            var options = new MessagePumpOptions(connectionString, concurrentTasks);
            var messagePump = new AsyncMessagePump(options)
            {
                OnMessage = (queueName, message, cancellationToken) =>
                {
                    // This is where you insert your custom logic to process a message
                },
                OnError = (queueName, message, exception, isPoison) =>
                {
                    // Insert your custom error handling

                    // ==========================================================================
                    // Important note regarding "isPoison":
                    // --------------------------------------------------------------------------
                    // this parameter indicates whether this message has exceeded the maximum
                    // number of retries. 
                    //
                    // When you have configured the "poison queue name" for the given queue and 
                    // this parameter is "true", the message is automatically copied to the poison
                    // queue and removed from the original queue.
                    // 
                    // If you have not configured the "poison queue name" for the given queue and
                    // this parameter is "true", the message is automatically removed from the
                    // original queue and you are responsible for storing the message. If you don't,
                    // this mesage will be lost.
                    // ==========================================================================
                }
            };

            // Replace the following samples with the queues you want to monitor
            messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
            messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
            messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");

            // Queues can share the same poison queue
            messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
            messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");

            // Queues can also share the same blob storage for messages that exceed the max size
            messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
            messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");

            // You can add all queues matching a given RegEx pattern
            await messagePump.AddQueuesByPatternAsync("myqueue*", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob", cancellationToken).ConfigureAwait(false);

            // Start the mesage pump (the token is particularly important because that's how the message pump will be notified when the worker is stopping)
            await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
        }
    }
}

消息处理器

如前一个代码示例所示,您可以定义自己的 OnMessage 委托,当消息处理程序处理每个消息时将会调用它。这在您的 C# 逻辑相当简单的情况下非常适用。然而,随着您逻辑的复杂性增加,您的 C# 代码可以变得相当复杂。

Picton.Messaging 库包括一个更高级的消息处理程序,可以帮助您简化这种情况:AsyncMessagePumpWithHandlers。您必须为每个消息类型定义所谓的“处理程序”。这些处理程序仅仅是一些实现 IMessageHandler<T> 接口的 C# 类,其中 T 是消息的类型。例如,如果您预计要处理类型为 MyMessage 的消息,您必须定义一个具有类似以下签名的类:public class MyMessageHandler : IMessageHandler<MyMessage>

一旦创建了所有需要的处理程序,您必须像以下示例中那样将它们注册到解决方案的 DI 服务容器中

Program.cs

using Picton.Messaging;
using WorkerService1;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();

/*
    You can either register your message handlers one by one like this:
    
    builder.Services.AddSingleton<IMessageHandler<MyMessage>, MyMessageHandler>()
    builder.Services.AddSingleton<IMessageHandler<MyOtherMessage>, MyOtherMessageHandler>()
    builder.Services.AddSingleton<IMessageHandler<AnotherMessage>, AnotherMessageHandler>()
*/

// Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this:
builder.Services.AddPictonMessageHandlers()

var host = builder.Build();
host.Run();

Worker.cs

using Picton.Messaging;

namespace WorkerService1
{
    public class Worker : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger<Worker> _logger;

        public Worker(IServiceProvider serviceProvider, ILogger<Worker> logger)
        {
            _serviceProvider = serviceProvider;
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var connectionString = "<--  connection string for your Azure storage account -->";
            var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time

            var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null);
            var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger)
            {
                OnError = (queueName, message, exception, isPoison) =>
                {
                    // Insert your custom error handling
                }
            };

            messagePump.AddQueue("myqueue", null, TimeSpan.FromMinutes(1), 3);

            await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
        }
    }
}

许可证

FOSSA Status

产品 兼容的和额外的计算目标框架版本。
.NET net5.0 已计算。 net5.0-windows 已计算。 net6.0 已计算。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 兼容。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。
.NET Core netcoreapp3.0 已计算。 netcoreapp3.1 已计算。
.NET 标准库 netstandard2.1 兼容。
.NET 框架 net48 兼容。 net481 已计算。
MonoAndroid monoandroid 已计算。
MonoMac monomac 已计算。
MonoTouch monotouch 已计算。
Tizen tizen60 已计算。
Xamarin.iOS xamarinios 已计算。
Xamarin.Mac xamarinmac 已计算。
Xamarin.TVOS xamarintvos 已计算。
Xamarin.WatchOS xamarinwatchos 已计算。
兼容的目标框架
包含的目标框架(在包中)
有关 目标框架 .NET 标准库 的更多信息。

NuGet 包

此包没有使用任何 NuGet 包。

GitHub 存储库

此包没有使用任何流行的 GitHub 存储库。

版本 下载 最后更新
9.1.0 111 2/24/2024
9.0.0 105 1/30/2024
8.0.0 160 1/1/2024
7.0.0 630 3/27/2020
6.0.0 643 5/22/2019
5.0.0 684 2/8/2019
4.0.0 978 5/31/2018
3.2.0 972 5/24/2018
3.1.0 987 5/8/2018
3.0.0 1,053 2/13/2018
2.0.0 1,071 12/27/2017
1.1.1 1,678 7/14/2017