StreamFlow 9.1.0
dotnet add package StreamFlow --version 9.1.0
NuGet\Install-Package StreamFlow -Version 9.1.0
<PackageReference Include="StreamFlow" Version="9.1.0" />
paket add StreamFlow --version 9.1.0
#r "nuget: StreamFlow, 9.1.0"
// Install StreamFlow as a Cake Addin #addin nuget:?package=StreamFlow&version=9.1.0 // Install StreamFlow as a Cake Tool #tool nuget:?package=StreamFlow&version=9.1.0
StreamFlow - 另一个 RabbitMQ 处理程序
理由
我知道野外有多款出色的 RabbitMQ 包装器(MassTransit、EasyNetQ 以及更多),它们比这个库做得更好。然而,我从未 able to 使用它们而不需要覆盖多个方法或类。所以我决定根据自己的需要制作一个自己的包装器。
通过 NuGet 安装
如果您想将 HTTP 沉降至项目中,您可以直接从 NuGet 安装它。
要安装包,请在包管理器控制台中运行以下命令
PM> Install-Package StreamFlow.RabbitMq
概念
RabbitMQ 服务器中的所有名称都可以通过实现 IRabbitMqConventions 并注册在依赖注入中来实现。但是默认行为如下
- 交换机名称使用请求类名称创建
- 队列名称使用
<request class name>:<request handler name>[:<service id>][:<consumer group name>]
创建 - 错误队列名称使用
<request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error
创建
A consumer group is a group of consumers that share the same group id.
When a topic is consumed by consumers in the same group, every record
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have
multiple services like customers and orders - they will have different
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders".
The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.
It can be also treated as a consumer group but in this case it applies to whole service.
组件生命周期
连接
连接对象注册为单例。但是消费者和发布者连接之间存在分离。这意味着单个服务实例可以连接到 RabbitMQ 服务器两次的最大。
IPublisher
使用单例生命周期。内部它使用具有 100_000 项固定大小的 Channel 集合。如果代码中的发布操作发生得比此集合处理得快得多,则您可能会收到具有原因:“InternalQueueIsFull”的 RabbitMqPublisherException。
在内部,发布者使用异步发布确认。这意味着每个 PublishAsync 实际上是(异步)等待服务器确认。有关更详细的说明,请参阅 RabbitMQ 文档:https://rabbitmq.cn/confirms.html#publisher-confirms 在这里,您还可以看到更多关于该行为的行为示例:https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html 总结一下:如果对每个 PublishAsync 方法进行等待,实际上是等待每种发布的确认,如本例所示
public async Task PublishMessagesAsync<T>(T [] messages)
{
foreach (var message in messages)
{
try
{
await _publisher.PublishAsync(message);
}
catch (Exception e)
{
// handle publication errors
}
}
}
上面的例子可能相对较慢,但是实现起来非常简单。为了改善发布性能,您可以采取与本例类似的方案
public async Task PublishMessagesAsync<T>(T [] messages)
{
var tasks = new Task[messages.Length];
for (var i = 0; i < messages.Length; ++i)
{
var message = messages[i];
tasks[i] = _publisher.PublishAsync(message);
}
foreach (var task in tasks)
{
try
{
await task;
}
catch (Exception e)
{
// handle publication errors
}
}
}
注意:如果在发布选项中不指定取消令牌(或指定默认值),或者没有指定超时,库将自动添加60秒超时。如果在此期间应用程序没有收到响应,则消息将标记为已取消。
中间件
每个中间件都作为瞬态添加。每个中间件都会在处理每个接收或发布的消息时创建。
消费者
每个消费者都是单例。但是,对于每个接收到的消息都会创建一个自己的作用域,并在处理结束后进行清理。
处理器
处理器以作用域实例的形式注册。
非常简单易用
定义消息
public class PingRequest
{
public DateTime Timestamp { get; set; }
}
发布消息
public class PingController
{
private readonly IPublisher _publisher;
public PingController(IPublisher publisher)
{
_publisher = publisher;
}
public async Task SendPing()
{
await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
}
}
定义消息消费者
public class PingRequestConsumer : IConsumer<PingRequest>
{
public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
{
Console.WriteLine(message.Body.Timestamp);
return Task.CompletedTask;
}
}
使用 ASP.NET Core 进行配置
services.AddStreamFlow(transport =>
{
transport
.UsingRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.StartConsumerHostedService()
)
.Consumers(builder => builder
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr1"))
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr2"))
)
.ConfigureConsumerPipe(builder => builder
.Use<LogAppIdMiddleware>()
)
.ConfigurePublisherPipe(builder => builder
.Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
);
});
上面的代码注册了流流程类,配置了 RabbitMQ 连接,并指示启动配置的 ASP.NET Core 托管服务,该服务启动了配置的消费者。它配置了 2 个消费者组,每个组启动 5 个实例。
ConfigureConsumerPipe 和 ConfigurePublisherPipe 注册了类似于中间件的操作,当接收到消息或发布时执行。这些中间件的代码
// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
private readonly ILogger<LogAppIdMiddleware> _logger;
public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
{
_logger = logger;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
if (!string.IsNullOrWhiteSpace(context.AppId))
{
_logger.LogInformation("AppId: {AppId}", context.AppId);
}
return next(context);
}
}
// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
private readonly string _appId;
public SetAppIdMiddleware(string appId)
{
_appId = appId;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
context.WithAppId(_appId);
return next(context);
}
}
尽管这些中间件示例非常简单,但在实际情况下可以创建非常强大的实现
- 重试
- 异常处理
- 日志记录
- 度量
- ...
MediatR
有一个用于 MediatR 库的实现,这可能会极大地简化消费者实现。
简单地安装包
Install-Package StreamFlow.RabbitMq.MediatR
库期望 MediatR 已经在依赖注入容器中配置并可用。
MediatR 通知
请求类必须实现 INotification 接口,例如
public class PingNotification : INotification
{
}
然后它可以在 StreamFlow 中使用
....
.Consumers(builder => builder
.AddNotification<PingNotification>()
)
....
MediatR 请求(无响应)
请求类必须实现 IRequest 接口,例如
public class PingRequest : IRequest
{
}
然后它可以在 StreamFlow 中使用
....
.Consumers(builder => builder
.AddRequest<PingRequest>()
)
....
MediatR 请求(带响应)
请求类必须实现 IRequest 接口,例如
public class PongResponse
{
}
public class PingPongRequest : IRequest<PongResponse>
{
}
然后它可以在 StreamFlow 中使用
....
.Consumers(builder => builder
.AddRequest<PingPongRequest, PongResponse>()
)
....
然而,在这种情况下,您将获得额外的行为:响应将通过 RabbitMQ 发布者总线发送回。为了使其工作,您还需要启用发布主机(请参阅 EnablePublisherHost 调用)
services.AddStreamFlow(transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.EnableConsumerHost(consumer => consumer.Prefetch(5))
.WithPrometheusMetrics()
.WithPublisherOptions(publisher => publisher
.EnablePublisherHost()
)
)
}
度量
库支持各种度量,代表着关于发布者和消费者的各种信息。默认情况下,它提供了 Prometheus 度量的实现,但它当然不仅限于它。
为了启用度量,请添加 Prometheus 包
PM> Install-Package StreamFlow.RabbitMq.Prometheus
并在配置 RabbitMQ 时启用度量(请参阅:WithPrometheusMetrics)
services.AddStreamFlow(streamFlowOptions, transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection(options.Host, options.Username, options.Password, options.VirtualHost)
.WithPrometheusMetrics()
....
);
});
Prometheus 度量
streamflow_messages_published
直方图,表示发布消息的数量和持续时间(秒)
标签:exchange,状态 [完成或失败]
streamflow_bus_messages_published
直方图,表示使用发布者主机时发布消息的数量和持续时间(秒)
标签:exchange,状态 [完成或失败]
streamflow_messages_publishing_events
直方图,表示发布过程中发生的各种事件,如通道创建、准备、序列化、事务提交等,有助于查看发布过程中是否存在任何瓶颈
标签:exchange,事件
streamflow_messages_publishing_errors
计数器,表示消息发布过程中发生的异常
标签:exchange
streamflow_bus_messages_publishing_errors
计数器,表示使用发布者主机时消息发布过程中发生的异常
标签:exchange
streamflow_messages_consumed
直方图,表示消费消息的数量和消费者持续时间(秒)
标签:交换,队列,状态
流流量消息消费错误
计数器,表示消费过程中遇到的异常
标签:交换,队列
流流量消息总线发布
计数器,显示使用发布者主机发布的消息数量
流流量消息总线发布错误
计数器,表示通过发布者主机发布时的错误,由于发布者主机通道是有限的——当发布者主机接收的消息多于其能实际发送到RabbitMQ的消息时,该计数器将开始增加
流流量发布者池大小
计数器,在启用发布者池时,显示池的大小(已创建但当前未使用的发布者)
流流量发布者池当前使用中
计数器,在启用发布者池时,显示当前有多少发布者实例正在使用
错误处理
每个应用程序都要处理错误。我相当确定——RabbitMQ处理程序可能会遇到多个异常情况。在出现未处理的异常时,将创建IRabbitMqErrorHandler实例,这将默认将消息发送到错误队列。尽管您可以通过覆盖IRabbitMqConventions接口来定义自己的错误队列名称,但默认行为只是在消费者组队列的末尾添加“:错误”后缀,并将带有异常详细信息的消息发送到该队列。应用程序开发人员或支持人员可以稍后决定对这些消息做什么。
产品 | 版本 兼容的和额外计算的目标框架版本。 |
---|---|
.NET | net6.0是兼容的。 net6.0-android已被计算。 net6.0-ios已被计算。 net6.0-maccatalyst已被计算。 net6.0-macos已被计算。 net6.0-tvos已被计算。 net6.0-windows已被计算。 net7.0已被计算。 net7.0-android已被计算。 net7.0-ios已被计算。 net7.0-maccatalyst已被计算。 net7.0-macos已被计算。 net7.0-tvos已被计算。 net7.0-windows已被计算。 net8.0已被计算。 net8.0-android已被计算。 net8.0-browser已被计算。 net8.0-ios已被计算。 net8.0-maccatalyst已被计算。 net8.0-macos已被计算。 net8.0-tvos已被计算。 net8.0-windows已被计算。 |
-
net6.0
- 无依赖项。
NuGet包 (2)
显示依赖于StreamFlow的前两个NuGet包
包 | 下载 |
---|---|
StreamFlow.RabbitMq
RabbitMQ客户端包装器 |
|
StreamFlow.Outbox.EntityFrameworkCore
RabbitMQ客户端包装器 |
GitHub仓库
此包不被任何流行的GitHub仓库使用。
版本 | 下载 | 最后更新 |
---|---|---|
9.1.0 | 8,648 | 4/19/2023 |
9.0.3 | 688 | 3/30/2023 |
9.0.2 | 273 | 3/29/2023 |
9.0.1 | 270 | 3/29/2023 |
9.0.0 | 284 | 3/29/2023 |
8.0.0 | 1,271 | 2/18/2023 |
7.4.0 | 484 | 2/11/2023 |
7.3.1 | 2,870 | 11/2/2022 |
7.3.0 | 812 | 10/28/2022 |
7.2.1 | 720 | 10/25/2022 |
7.2.0 | 651 | 10/25/2022 |
7.1.4 | 1,124 | 10/11/2022 |
7.1.3 | 696 | 10/11/2022 |
7.1.2 | 703 | 10/11/2022 |
7.1.1 | 694 | 10/10/2022 |
7.1.0 | 763 | 10/9/2022 |
7.0.0 | 663 | 10/9/2022 |
6.8.1 | 3,628 | 5/20/2022 |
6.8.0 | 693 | 5/20/2022 |
6.7.2 | 1,053 | 2/24/2022 |
6.7.1 | 929 | 1/23/2022 |
6.7.0 | 900 | 1/17/2022 |
6.6.1 | 807 | 1/13/2022 |
6.6.0 | 900 | 1/12/2022 |
6.5.1 | 404 | 12/20/2021 |
6.5.0 | 379 | 12/20/2021 |
6.4.0 | 385 | 12/20/2021 |
6.3.4 | 381 | 12/14/2021 |
6.3.3 | 378 | 12/14/2021 |
6.3.2 | 381 | 12/14/2021 |
6.3.1 | 392 | 12/14/2021 |
6.3.0 | 459 | 12/14/2021 |
6.2.1 | 406 | 12/9/2021 |
6.2.0 | 385 | 12/9/2021 |
6.1.2 | 384 | 12/8/2021 |
6.1.1 | 400 | 12/8/2021 |
6.1.0 | 412 | 12/8/2021 |
6.0.0 | 653 | 12/6/2021 |
5.2.0 | 670 | 11/23/2021 |
5.1.0 | 655 | 11/23/2021 |
5.0.0 | 420 | 11/14/2021 |
4.1.0 | 912 | 10/14/2021 |
4.0.3 | 805 | 9/20/2021 |
4.0.2 | 832 | 9/19/2021 |
4.0.1 | 769 | 9/19/2021 |
4.0.0 | 780 | 9/19/2021 |
3.9.0 | 834 | 9/19/2021 |
3.8.0 | 894 | 9/12/2021 |
3.7.0 | 866 | 9/12/2021 |
3.5.0 | 772 | 9/7/2021 |
3.4.0 | 769 | 9/7/2021 |
3.3.0 | 735 | 9/7/2021 |
3.2.0 | 735 | 9/7/2021 |
3.1.0 | 721 | 9/2/2021 |
3.0.0 | 369 | 9/1/2021 |
2.5.0 | 535 | 5/4/2021 |
2.3.0 | 355 | 5/4/2021 |
2.2.0 | 396 | 4/27/2021 |
2.1.0 | 385 | 4/26/2021 |
2.0.0 | 365 | 4/26/2021 |