StreamFlow.RabbitMq.MediatR 9.1.0

dotnet add package StreamFlow.RabbitMq.MediatR --version 9.1.0                
NuGet\Install-Package StreamFlow.RabbitMq.MediatR -Version 9.1.0                
此命令旨在在 Visual Studio 的包管理器控制台中使用,因为它使用 NuGet 模块的 Install-Package 版本。
<PackageReference Include="StreamFlow.RabbitMq.MediatR" Version="9.1.0" />                
对于支持 PackageReference 的项目,将此 XML 节点复制到项目文件中以引用包。
paket add StreamFlow.RabbitMq.MediatR --version 9.1.0                
#r "nuget: StreamFlow.RabbitMq.MediatR, 9.1.0"                
#r 指令可用于 F# Interactive 和 Polyglot Notebooks。将此复制到交互式工具或脚本的源代码中,以引用包。
// Install StreamFlow.RabbitMq.MediatR as a Cake Addin
#addin nuget:?package=StreamFlow.RabbitMq.MediatR&version=9.1.0

// Install StreamFlow.RabbitMq.MediatR as a Cake Tool
#tool nuget:?package=StreamFlow.RabbitMq.MediatR&version=9.1.0                

StreamFlow - 另一个 RabbitMQ 处理器

理由

我知道在野外有许多多个酷的 RabbitMQ 包装器(MassTransit、EasyNetQ 及其他),它们的工作效果比这个库好得多。然而,我从未能够在不重写多个方法或类的情况下使用它们。因此,我决定根据自己的需求制作自己的包装器。

通过 NuGet 安装

如果您想在项目中包含 HTTP 缓冲区,您可以直接从 NuGet 中 安装它

要安装包,请在包管理器控制台运行以下命令

PM> Install-Package StreamFlow.RabbitMq

概念

RabbitMQ 服务器中的所有名称都可以通过实现 IRabbitMqConventions 并在依赖注入中注册来定义。但是默认行为是这样的

  • 交换名称是通过请求类名称创建的
  • 队列名称是通过 <request class name>:<request handler name>[:<service id>][:<consumer group name> 创建的
  • 错误队列名称是通过 <request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error 创建的
A consumer group is a group of consumers that share the same group id. 
When a topic is consumed by consumers in the same group, every record 
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the 
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have 
multiple services like customers and orders - they will have different 
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders". 

The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.

It can be also treated as a consumer group but in this case it applies to whole service.

组件生命周期

连接

连接对象注册为单例。但是消费者和发布者连接之间存在分离。这意味着单个服务实例可以到 RabbitMQ 服务器有两个最大连接。

IPublisher

使用单例生命周期。内部使用具有 100_000 项容量的 Channel 集合。如果代码中的发布很频繁,则此集合会被处理 - 你可能收到 RabbitMqPublisherException 并提示“InternalQueueIsFull”。

底层发布者使用异步发布确认,这意味着每个 PublishAsync 实际上是(异步地)等待服务器确认。有关更详细的解释,请参阅 RabbitMQ 文档:[https://rabbitmq.cn/confirms.html#publisher-confirms](https://rabbitmq.cn/confirms.html#publisher-confirms)。在此处,您还可以看到该行为的更多示例:[https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html](https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html)。总结一下:如果您在每个 PublishAsync 方法上等待,实际上就像这个示例一样等待每条发布的确认。

public async Task PublishMessagesAsync<T>(T [] messages)
{
    foreach (var message in messages)
    {
      try
      {
          await _publisher.PublishAsync(message);
      }
      catch (Exception e)
      {
          // handle publication errors
      }
    }
}

上面的示例可能相对较慢,但实现起来非常简单。为了提高发布性能,您可以采取与以下示例类似的方法。

public async Task PublishMessagesAsync<T>(T [] messages)
{
    var tasks = new Task[messages.Length];
    for (var i = 0; i < messages.Length; ++i)
    {
        var message = messages[i];
        tasks[i] = _publisher.PublishAsync(message);
    }
    
    foreach (var task in tasks)
    {
        try
        {
            await task;
        }
        catch (Exception e)
        {
            // handle publication errors
        }
    }
}

注意:如果您未指定取消令牌(或指定默认值),或者在发布选项中没有指定超时,库会自动添加 60 秒的超时。如果在指定时间内未收到响应,消息将被标记为已取消。

中间件

每个中间件都会被添加为瞬态的。每个中间件都会在收到或发布每条消息时创建。

消费者

每个消费者都是单例的。然而,对于收到的每条消息,都会创建和销毁自己的作用域。

处理器

处理器被注册为作用域实例。

使用非常简单

定义消息

public class PingRequest
{
    public DateTime Timestamp { get; set; }
}

发布消息

public class PingController
{
    private readonly IPublisher _publisher;
    
    public PingController(IPublisher publisher)
    {
        _publisher = publisher;
    }
    
    public async Task SendPing()
    {
        await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
    }
}

定义消息消费者

public class PingRequestConsumer : IConsumer<PingRequest>
{
    public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
    {
        Console.WriteLine(message.Body.Timestamp);
        return Task.CompletedTask;
    }
}

使用 ASP.NET Core 配置

services.AddStreamFlow(transport =>
{
    transport
        .UsingRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .StartConsumerHostedService()
        )
        .Consumers(builder => builder
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr1"))
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr2"))
        )
        .ConfigureConsumerPipe(builder => builder
            .Use<LogAppIdMiddleware>()
        )
        .ConfigurePublisherPipe(builder => builder
            .Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
        );
});

上面的代码注册了流流类,配置 RabbitMQ 连接,并指示启动配置的消费者 ASP.NET Core 主机服务。它配置了 2 个消费者分组,每组启动 5 个实例。

ConfigureConsumerPipe 和 ConfigurePublisherPipe 注册了类似于中间件的操作,这些操作会在收到消息或发布时执行。这些中间件代码

// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly ILogger<LogAppIdMiddleware> _logger;

    public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
    {
        _logger = logger;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        if (!string.IsNullOrWhiteSpace(context.AppId))
        {
            _logger.LogInformation("AppId: {AppId}", context.AppId);
        }

        return next(context);
    }
}

// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly string _appId;

    public SetAppIdMiddleware(string appId)
    {
        _appId = appId;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        context.WithAppId(_appId);
        return next(context);
    }
}

尽管这些示例中间件在网络中非常简单,但在现实世界的场景中可以创建非常强大的实现。

  • 重试
  • 异常处理
  • 日志记录
  • 度量
  • ...

MediatR

存在对 MediatR 库的实现,这可能大大简化消费者实现。

简单地安装包

Install-Package StreamFlow.RabbitMq.MediatR

库期望 MediatR 已经在依赖注入容器中配置并可用。

MediatR 通知

请求类必须实现 INotification 接口,例如

public class PingNotification : INotification
{
}

然后它可以用于 StreamFlow

    ....
    .Consumers(builder => builder
        .AddNotification<PingNotification>()
    )
    ....

MediatR 无响应请求

请求类必须实现 IRequest 接口,例如

public class PingRequest : IRequest
{
}

然后它可以用于 StreamFlow

    ....
    .Consumers(builder => builder
        .AddRequest<PingRequest>()
    )
    ....

MediatR 带响应请求

请求类必须实现 IRequest 接口,例如

public class PongResponse
{
}

public class PingPongRequest : IRequest<PongResponse>
{
}

然后它可以用于 StreamFlow

    ....
    .Consumers(builder => builder
        .AddRequest<PingPongRequest, PongResponse>()
    )
    ....

然而,在这种情况下,您会得到额外的行为:响应将通过 RabbitMQ 发布者总线发送。为此,您还需要启用发布者主服务器(请参阅 EnablePublisherHost 调用)

services.AddStreamFlow(transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .EnableConsumerHost(consumer => consumer.Prefetch(5))
            .WithPrometheusMetrics()
            .WithPublisherOptions(publisher => publisher
                .EnablePublisherHost()
            )
        )
}    

度量

库支持各种度量指标,表示有关发布者和消费者洞察的各种信息。开箱即用,有一个 Prometheus 度量指标实现的实现,但无疑不仅限于它。

为了启用度量,添加 Prometheus 包

PM> Install-Package StreamFlow.RabbitMq.Prometheus

并在配置 RabbitMQ 时启用度量(请参阅:WithPrometheusMetrics)

services.AddStreamFlow(streamFlowOptions, transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection(options.Host, options.Username, options.Password, options.VirtualHost)
            .WithPrometheusMetrics()
            ....
        );
});

Prometheus 度量

  • streamflow_messages_published

    直方图,表示发布消息的数量和持续时间(秒)

    标签:exchange,state [完成的或失败的]

  • streamflow_bus_messages_published

    直方图,表示使用发布者主服务器时发布消息的数量和持续时间(秒)

    标签:exchange,state [完成的或失败的]

  • streamflow_messages_publishing_events

    直方图,表示在发布过程中发生的各种事件,如通道创建、准备工作、序列化、事务提交等,有助于了解发布过程中是否有瓶颈

    标签:exchange,event

  • 流量信息发布错误

    计数器,表示在信息发布过程中发生的异常

    标签:交换机

  • 流量系统总线信息发布错误

    计数器,表示在使用发布者主机时信息发布过程中发生的异常

    标签:交换机

  • 流量信息消费

    直方图,消费的消息数及消费者持续时间(秒)

    标签:交换机、队列、状态

  • 流量信息消费错误

    计数器,表示在消费过程中遇到的异常

    标签:交换机、队列

  • 流量系统总线信息发布

    计数器,显示使用发布者主机发布的消息数量

  • 流量系统总线信息发布错误

    计数器,表示由于发布者主机通道限制(当发布者主机接收的消息数量超过能发送到RabbitMQ的数量时开始增加)而发布到发布者主机的错误

  • 流量发布者池大小

    计数器,使用发布者池时显示池大小(已创建但当前未使用的发布者)

  • 流量发布者池使用中

    计数器,使用发布者池时显示当前正在使用中的发布者实例数量

错误处理

每个应用程序都会处理错误。我相信——RabbitMQ处理程序可以遇到多个异常情况。在未处理异常的情况下,将创建一个IRabbitMqErrorHandler实例,该实例默认情况下将消息发送到错误队列。尽管您可以通过覆盖IRabbitMqConventions接口来定义自己的错误队列名称,但默认行为是在消费者组队列名后添加":Error"后缀,然后将包含异常细节的消息头发送到该队列。应用程序开发者或支持人员可以在以后决定对这些消息的处理方式。

产品 兼容和额外计算的Target Framework版本。
.NET net6.0 兼容。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 已计算。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。
兼容的目标框架
包含的目标框架(在包中)
详细了解目标框架.NET Standard

NuGet包

此包未被任何NuGet包使用。

GitHub仓库

此包未被任何流行的GitHub仓库使用。

版本 下载 最后更新
9.1.0 4,543 4/19/2023
9.0.3 396 3/30/2023
9.0.2 169 3/29/2023
9.0.1 176 3/29/2023
9.0.0 215 3/29/2023
8.0.0 571 2/18/2023
7.4.0 304 2/11/2023
7.3.1 807 11/2/2022
7.3.0 503 10/28/2022
7.2.1 368 10/25/2022
7.2.0 351 10/25/2022
7.1.4 504 10/11/2022
7.1.3 363 10/11/2022
7.1.2 392 10/11/2022
7.1.1 372 10/10/2022
7.1.0 420 10/9/2022
7.0.0 385 10/9/2022
6.8.1 911 5/20/2022
6.8.0 378 5/20/2022
6.7.2 429 2/24/2022
6.7.1 401 1/23/2022
6.7.0 436 1/17/2022
6.6.1 254 1/13/2022
6.6.0 403 1/12/2022