Picton.Messaging 9.1.0
dotnet add package Picton.Messaging --version 9.1.0
NuGet\Install-Package Picton.Messaging -Version 9.1.0
<PackageReference Include="Picton.Messaging" Version="9.1.0" />
paket add Picton.Messaging --version 9.1.0
#r "nuget: Picton.Messaging, 9.1.0"
// Install Picton.Messaging as a Cake Addin #addin nuget:?package=Picton.Messaging&version=9.1.0 // Install Picton.Messaging as a Cake Tool #tool nuget:?package=Picton.Messaging&version=9.1.0
Picton
关于
Picton.Messaging 是一个 C# 库,其中包含一个高性能的消息处理器(也称为消息“泵”),旨在尽可能高效地处理来自 Azure 存储队列的消息。
我创建了 Picton.Mesaging,因为我需要一个方法来以尽可能快和高效的方式处理从 Azure 存储队列的大量消息。我搜索了很长时间,但从未找到满足我所有要求的解决方案。
2016年3月,我参加了Daniel Marbach举办的三个关于“异步/Await与任务并行库”的Webinar(《Daniel Marbach关于异步/Await与任务并行库的Webinar》(第1部分:https://github.com/danielmarbach/02-25-2016-AsyncWebinar)、第2部分:https://github.com/danielmarbach/03-03-2016-AsyncWebinar和第3部分:https://github.com/danielmarbach/03-10-2016-AsyncWebinar))。第2部分对我来说特别有趣,因为Daniel提出了一个通用的消息泵,它满足了我大部分(但不是全部)的需求。具体来说,Daniel的消息泵符合以下标准:
- 并发消息处理。这意味着可以同时处理多个消息。这对于消息处理逻辑是I/O密集型的情况尤为重要。
- 限制并发。这意味着我们可以决定同时可以处理的最大消息数。
- 取消和优雅关闭。这意味着我们可以通知消息泵我们希望停止处理更多的消息,并且我们还可以决定在我们决定停止时对正在处理的消息采取什么行动。
Daniel在Webinar中分享的示例代码非常通用,并不特定于Azure,所以我做出了以下增强:
- 从Azure存储队列中获取消息。我们可以指定哪个队列,甚至可以是存储模拟器中的队列。
- 延迟。当在队列中没有找到消息时,缩小规模并减少对队列的查询是非常重要的。一旦队列中有新的消息,我们希望尽可能快地缩放并处理这些消息。为了实现这一逻辑,我做了以下两项改进:
- 首先,当队列中没有找到消息时引入一个暂停,以减少对存储队列的查询次数。默认情况下,我们会暂停一秒钟,但这可以配置。你甚至可以完全消除暂停,但我不建议这样做。
- 其次,减少并发消息处理任务的数目。这是Picton库(我个人认为)引入的最重要改进!Daniel的示例代码使用
SemaphoreSlim
作为“看门人”并限制可以同时执行的任务的数目。然而,由信号量允许的“槽位”数量必须预先确定,且是固定的。Picton消除了这个限制,并允许这个数字根据队列中消息的存在或不存在动态增加和减少。
2017年12月,发布了第2.0版,该版本引入了一种从Azure队列获取消息的更有效的方法:现在有一个单独的任务来完成这个任务,而不是允许每个并发任务自己获取消息。这意味着不再需要逻辑来增加/减少SemaphoreSlim中可用槽位的数量,并且已经移除。
2024年1月,发布了第9.0版,增加了两个主要新功能:消息泵现在能够监控多个队列,并且添加了一个专门的消息泵版本以监控遵循特定名称约定的队列。此外,这个专门的消息泵会定期查询Azure存储以检测是否创建了新的队列。在我看来,这是当你有一个有多个租户的解决方案且每个租户有一个队列时的理想解决方案。
Nuget
Picton.Messaging作为一个Nuget包可用。
安装
将Picton.Messaging包含到你的C#项目中最简单的方法是抓取Nuget包。
PM> Install-Package Picton.Messaging
如何使用
一旦你在项目中正确引用了Picton Messaging库,你需要以下两个CSharp文件:
Program.cs
using WorkerService1;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();
var host = builder.Build();
host.Run();
Worker.cs
using Picton.Messaging;
namespace WorkerService1
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionString = "<-- connection string for your Azure storage account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
// Configure the message pump
var options = new MessagePumpOptions(connectionString, concurrentTasks);
var messagePump = new AsyncMessagePump(options)
{
OnMessage = (queueName, message, cancellationToken) =>
{
// This is where you insert your custom logic to process a message
},
OnError = (queueName, message, exception, isPoison) =>
{
// Insert your custom error handling
// ==========================================================================
// Important note regarding "isPoison":
// --------------------------------------------------------------------------
// this parameter indicates whether this message has exceeded the maximum
// number of retries.
//
// When you have configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically copied to the poison
// queue and removed from the original queue.
//
// If you have not configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically removed from the
// original queue and you are responsible for storing the message. If you don't,
// this mesage will be lost.
// ==========================================================================
}
};
// Replace the following samples with the queues you want to monitor
messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");
// Queues can share the same poison queue
messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");
// Queues can also share the same blob storage for messages that exceed the max size
messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
// You can add all queues matching a given RegEx pattern
await messagePump.AddQueuesByPatternAsync("myqueue*", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob", cancellationToken).ConfigureAwait(false);
// Start the mesage pump (the token is particularly important because that's how the message pump will be notified when the worker is stopping)
await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
}
}
}
消息处理器
如前一个代码示例所示,您可以定义自己的 OnMessage
委托,当消息处理程序处理每个消息时将会调用它。这在您的 C# 逻辑相当简单的情况下非常适用。然而,随着您逻辑的复杂性增加,您的 C# 代码可以变得相当复杂。
Picton.Messaging 库包括一个更高级的消息处理程序,可以帮助您简化这种情况:AsyncMessagePumpWithHandlers
。您必须为每个消息类型定义所谓的“处理程序”。这些处理程序仅仅是一些实现 IMessageHandler<T>
接口的 C# 类,其中 T
是消息的类型。例如,如果您预计要处理类型为 MyMessage
的消息,您必须定义一个具有类似以下签名的类:public class MyMessageHandler : IMessageHandler<MyMessage>
。
一旦创建了所有需要的处理程序,您必须像以下示例中那样将它们注册到解决方案的 DI 服务容器中
Program.cs
using Picton.Messaging;
using WorkerService1;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();
/*
You can either register your message handlers one by one like this:
builder.Services.AddSingleton<IMessageHandler<MyMessage>, MyMessageHandler>()
builder.Services.AddSingleton<IMessageHandler<MyOtherMessage>, MyOtherMessageHandler>()
builder.Services.AddSingleton<IMessageHandler<AnotherMessage>, AnotherMessageHandler>()
*/
// Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this:
builder.Services.AddPictonMessageHandlers()
var host = builder.Build();
host.Run();
Worker.cs
using Picton.Messaging;
namespace WorkerService1
{
public class Worker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<Worker> _logger;
public Worker(IServiceProvider serviceProvider, ILogger<Worker> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionString = "<-- connection string for your Azure storage account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null);
var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger)
{
OnError = (queueName, message, exception, isPoison) =>
{
// Insert your custom error handling
}
};
messagePump.AddQueue("myqueue", null, TimeSpan.FromMinutes(1), 3);
await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
}
}
}
许可证
产品 | 版本 兼容的和额外的计算目标框架版本。 |
---|---|
.NET | net5.0 已计算。 net5.0-windows 已计算。 net6.0 已计算。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 兼容。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。 |
.NET Core | netcoreapp3.0 已计算。 netcoreapp3.1 已计算。 |
.NET 标准库 | netstandard2.1 兼容。 |
.NET 框架 | net48 兼容。 net481 已计算。 |
MonoAndroid | monoandroid 已计算。 |
MonoMac | monomac 已计算。 |
MonoTouch | monotouch 已计算。 |
Tizen | tizen60 已计算。 |
Xamarin.iOS | xamarinios 已计算。 |
Xamarin.Mac | xamarinmac 已计算。 |
Xamarin.TVOS | xamarintvos 已计算。 |
Xamarin.WatchOS | xamarinwatchos 已计算。 |
-
.NETFramework 4.8
- App.Metrics (>= 4.3.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
- System.Threading.Channels (>= 8.0.0)
-
.NETStandard 2.1
- App.Metrics (>= 4.3.0)
- Microsoft.CSharp (>= 4.7.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
- System.Threading.Channels (>= 8.0.0)
-
net7.0
- App.Metrics (>= 4.3.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
NuGet 包
此包没有使用任何 NuGet 包。
GitHub 存储库
此包没有使用任何流行的 GitHub 存储库。