StreamFlow.RabbitMq.Prometheus 9.1.0
dotnet add package StreamFlow.RabbitMq.Prometheus --version 9.1.0
NuGet\Install-Package StreamFlow.RabbitMq.Prometheus -Version 9.1.0
<PackageReference Include="StreamFlow.RabbitMq.Prometheus" Version="9.1.0" />
paket add StreamFlow.RabbitMq.Prometheus --version 9.1.0
#r "nuget: StreamFlow.RabbitMq.Prometheus, 9.1.0"
// Install StreamFlow.RabbitMq.Prometheus as a Cake Addin #addin nuget:?package=StreamFlow.RabbitMq.Prometheus&version=9.1.0 // Install StreamFlow.RabbitMq.Prometheus as a Cake Tool #tool nuget:?package=StreamFlow.RabbitMq.Prometheus&version=9.1.0
StreamFlow - 另一个 RabbitMQ 处理程序
理由
我知道在野外有多个非常酷的 RabbitMQ 包装器(MassTransit、EasyNetQ 等),它们比这个库做得更好。然而,我从未能够像现在这样使用它们,而不需要重写多个方法或类。所以,我决定根据自己的需求制作一个自己的包装器。
通过 NuGet 安装
如果您想将 HTTP 溢出包含到您的项目中,您可以直接从 NuGet 安装它。
要安装包,请在包管理器控制台中运行以下命令
PM> Install-Package StreamFlow.RabbitMq
概念
在 RabbitMQ 服务器中,所有名称都可以通过实现 IRabbitMqConventions 并在依赖注入中注册来定义。但是默认行为如下
- 交换名称使用请求类名称创建
- 队列名称使用
<request class name>:<request handler name>[:<service id>][:<consumer group name>]
创建 - 错误队列名称使用
<request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error
创建
A consumer group is a group of consumers that share the same group id.
When a topic is consumed by consumers in the same group, every record
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have
multiple services like customers and orders - they will have different
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders".
The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.
It can be also treated as a consumer group but in this case it applies to whole service.
组件生命周期
连接
连接对象作为单例注册。然而,消费者和发布者连接之间存在分离。这意味着单个服务实例可以与 RabbitMQ 服务器建立最多两个连接。
IPublisher
使用单例生命周期。内部使用大小为 100_000 项的 Channel 集合。如果代码中的发布操作比此集合的处理速度快得多,则您可能会收到 RabbitMqPublisherException,原因:InternalQueueIsFull。
底层数据发布者使用异步发布确认。这意味着每次PublishAsync实际都是异步等待服务器确认。有关更详细的解释,请参阅RabbitMQ文档:https://rabbitmq.cn/confirms.html#publisher-confirms 在这里,您还可以看到更多关于该行为的详细示例:https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html 概括来说:如果您对每个PublishAsync方法进行await,实际上就像以下示例一样等待每个发表的确认
public async Task PublishMessagesAsync<T>(T [] messages)
{
foreach (var message in messages)
{
try
{
await _publisher.PublishAsync(message);
}
catch (Exception e)
{
// handle publication errors
}
}
}
上面的示例可能相对较慢,但它非常简单易实现。为了提高发布性能,您可以在示例中采取类似的方法
public async Task PublishMessagesAsync<T>(T [] messages)
{
var tasks = new Task[messages.Length];
for (var i = 0; i < messages.Length; ++i)
{
var message = messages[i];
tasks[i] = _publisher.PublishAsync(message);
}
foreach (var task in tasks)
{
try
{
await task;
}
catch (Exception e)
{
// handle publication errors
}
}
}
注意:如果您没有指定取消令牌(或者指定默认),或者发布选项中没有指定超时,库会自动添加60秒的超时。如果应用程序在指定时间内未收到响应,则会标记消息为已取消。
中间件
每个中间件都是作为瞬态添加的。每个中间件将在每个接收或发布的消息上创建。
消费者
每个消费者都是单例的。但是对于每个接收到的消息,会创建一个自己的作用域,在处理结束后进行释放。
处理器
处理器作为范围实例进行注册。
非常简单易用
定义消息
public class PingRequest
{
public DateTime Timestamp { get; set; }
}
发布消息
public class PingController
{
private readonly IPublisher _publisher;
public PingController(IPublisher publisher)
{
_publisher = publisher;
}
public async Task SendPing()
{
await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
}
}
定义消息消费者
public class PingRequestConsumer : IConsumer<PingRequest>
{
public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
{
Console.WriteLine(message.Body.Timestamp);
return Task.CompletedTask;
}
}
使用ASP.NET Core进行配置
services.AddStreamFlow(transport =>
{
transport
.UsingRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.StartConsumerHostedService()
)
.Consumers(builder => builder
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr1"))
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr2"))
)
.ConfigureConsumerPipe(builder => builder
.Use<LogAppIdMiddleware>()
)
.ConfigurePublisherPipe(builder => builder
.Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
);
});
上面的代码注册了流流类,配置了RabbitMQ连接,并指示启动ASP.NET Core托管服务,该服务启动了配置的消费者。它配置了2个消费者分组,每组启动5个实例。
ConfigureConsumerPipe和ConfigurePublisherPipe注册了在收到消息或发布时执行的类似于中间件的操作。以下是这些中间件的代码
// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
private readonly ILogger<LogAppIdMiddleware> _logger;
public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
{
_logger = logger;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
if (!string.IsNullOrWhiteSpace(context.AppId))
{
_logger.LogInformation("AppId: {AppId}", context.AppId);
}
return next(context);
}
}
// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
private readonly string _appId;
public SetAppIdMiddleware(string appId)
{
_appId = appId;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
context.WithAppId(_appId);
return next(context);
}
}
尽管这些示例中间件在现实世界场景中非常简单,但可以构建非常强大的实现
- 重试
- 异常处理
- 日志记录
- 度量
- ...
MediatR
有一个为MediatR库实现的示例,这可能会极大地简化消费者实现。
简单安装包
Install-Package StreamFlow.RabbitMq.MediatR
库希望MediatR本身已在依赖注入容器中配置并可用。
MediatR通知
请求类必须实现INotification接口,例如
public class PingNotification : INotification
{
}
然后它可以在StreamFlow中使用
....
.Consumers(builder => builder
.AddNotification<PingNotification>()
)
....
MediatR无响应请求
请求类必须实现IRequest接口,例如
public class PingRequest : IRequest
{
}
然后它可以在StreamFlow中使用
....
.Consumers(builder => builder
.AddRequest<PingRequest>()
)
....
MediatR带响应请求
请求类必须实现IRequest接口,例如
public class PongResponse
{
}
public class PingPongRequest : IRequest<PongResponse>
{
}
然后它可以在StreamFlow中使用
....
.Consumers(builder => builder
.AddRequest<PingPongRequest, PongResponse>()
)
....
然而在这种情况下,您会得到额外的行为:响应是通过RabbitMQ发布者总线发送的。为了使这成为可能,您还需要启用发布主机(请参阅EnablePublisherHost调用)
services.AddStreamFlow(transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.EnableConsumerHost(consumer => consumer.Prefetch(5))
.WithPrometheusMetrics()
.WithPublisherOptions(publisher => publisher
.EnablePublisherHost()
)
)
}
度量
库支持各种度量,这些度量表示有关发布者和消费者洞察的各种信息。开箱即用,就有Prometheus度量实现的示例,但当然它不仅限于此。
为了启用度量,请添加Prometheus包
PM> Install-Package StreamFlow.RabbitMq.Prometheus
并在配置RabbitMQ时启用度量(请参阅:WithPrometheusMetrics)
services.AddStreamFlow(streamFlowOptions, transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection(options.Host, options.Username, options.Password, options.VirtualHost)
.WithPrometheusMetrics()
....
);
});
Prometheus度量
streamflow_messages_published
表示已发布消息的数量和持续时间的直方图(秒)
标签:交易所、状态[完成或失败]
streamflow_bus_messages_published
表示使用发布者主机时发布消息的数量和持续时间的直方图(秒)
标签:交易所、状态[完成或失败]
streamflow_messages_publishing_events
表示在发布过程中发生的各种事件的直方图,如通道创建、准备、序列化、事务提交等,有助于了解发布过程中是否存在任何瓶颈
标签:交易所、事件
streamflow_messages_publishing_errors
表示在消息发布过程中发生的异常的计数器
标签:交易所
streamflow_bus_messages_publishing_errors
表示使用发布者主机时在消息发布过程中发生的异常的计数器
标签:交易所
streamflow_messages_consumed
表示已消费消息的数量和消费者持续时间的直方图(秒)
标签:交易所、队列、状态
streamflow_messages_consumed_errors
计数器,表示在消费者处理过程中获得的异常
标签:交换、队列
streamflow_messages_bus_publishing
计数器,显示使用发布者主机发布了多少条消息
streamflow_messages_bus_publishing_errors
计数器,表示使用发布者主机发布时的错误,因为发布者主机通道是有限制的 - 当发布者主机接收的消息超出其实际能够发送到RabbitMQ的能力时,该计数器将开始增加
streamflow_publisher_pool_size
计数器,使用发布者池时,显示池大小(已创建但当前未使用的发布者)
streamflow_publisher_pool_in_use
计数器,使用发布者池时,显示当前有多少个发布者实例正在使用
错误处理
每个应用程序都会处理错误。我确信 - RabbitMQ处理器可能会遇到多种异常情况。在未处理异常的情况下,将创建IRabbitMqErrorHandler实例,该实例默认会将消息发送到错误队列。尽管你可以通过重写IRabbitMqConventions接口定义自己的错误队列名称,但默认行为仅将“:错误”后缀添加到消费者组队列的末尾,并将带有异常详细信息的消息发送到此队列。应用程序开发人员或支持人员以后可以决定如何处理这些消息。
产品 | 版本 兼容和额外计算的目标框架版本。 |
---|---|
.NET | net6.0 兼容。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 已计算。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。 |
-
net6.0
- prometheus-net (>= 8.0.0)
- StreamFlow.RabbitMq (>= 9.1.0)
NuGet 包
此包没有被任何 NuGet 包使用。
GitHub 代码库
此包没有被任何流行的 GitHub 代码库使用。
版本 | 下载 | 最后更新 |
---|---|---|
9.1.0 | 8,456 | 4/19/2023 |
9.0.3 | 586 | 3/30/2023 |
9.0.2 | 176 | 3/29/2023 |
9.0.1 | 183 | 3/29/2023 |
9.0.0 | 203 | 3/29/2023 |
8.0.0 | 1,120 | 2/18/2023 |
7.4.0 | 323 | 2/11/2023 |
7.3.1 | 2,599 | 11/2/2022 |
7.3.0 | 548 | 10/28/2022 |
7.2.1 | 416 | 10/25/2022 |
7.2.0 | 344 | 10/25/2022 |
7.1.4 | 795 | 10/11/2022 |
7.1.3 | 372 | 10/11/2022 |
7.1.2 | 378 | 10/11/2022 |
7.1.1 | 359 | 10/10/2022 |
7.1.0 | 448 | 10/9/2022 |
7.0.0 | 337 | 10/9/2022 |
6.8.1 | 3,150 | 5/20/2022 |
6.8.0 | 376 | 5/20/2022 |
6.7.2 | 525 | 2/24/2022 |
6.7.1 | 442 | 1/23/2022 |
6.7.0 | 424 | 1/17/2022 |
6.6.1 | 426 | 1/13/2022 |
6.6.0 | 434 | 1/12/2022 |
6.5.1 | 285 | 12/20/2021 |
6.5.0 | 259 | 12/20/2021 |
6.4.0 | 249 | 12/20/2021 |
6.3.4 | 236 | 12/14/2021 |
6.3.3 | 235 | 12/14/2021 |
6.3.2 | 250 | 12/14/2021 |
6.3.1 | 264 | 12/14/2021 |
6.3.0 | 264 | 12/14/2021 |
6.2.1 | 245 | 12/9/2021 |
6.2.0 | 263 | 12/9/2021 |
6.1.2 | 252 | 12/8/2021 |
6.1.1 | 256 | 12/8/2021 |
6.1.0 | 262 | 12/8/2021 |
6.0.0 | 541 | 12/6/2021 |
5.2.0 | 279 | 11/23/2021 |
5.1.0 | 261 | 11/23/2021 |
5.0.0 | 295 | 11/14/2021 |
4.1.0 | 413 | 10/14/2021 |
4.0.3 | 307 | 9/20/2021 |
4.0.2 | 342 | 9/19/2021 |
4.0.1 | 282 | 9/19/2021 |
4.0.0 | 292 | 9/19/2021 |
3.9.0 | 317 | 9/19/2021 |
3.8.0 | 407 | 9/12/2021 |
3.7.0 | 359 | 9/12/2021 |
3.5.0 | 277 | 9/7/2021 |