StreamFlow.RabbitMq 9.1.0

dotnet add package StreamFlow.RabbitMq --version 9.1.0                
NuGet\Install-Package StreamFlow.RabbitMq -Version 9.1.0                
该命令旨在用于 Visual Studio 的包管理器控制台,因为它使用 NuGet 模块的版本 Install-Package.
<PackageReference Include="StreamFlow.RabbitMq" Version="9.1.0" />                
对于支持 PackageReference 的项目,请将此 XML 节点复制到项目文件中以引用包。
paket add StreamFlow.RabbitMq --version 9.1.0                
#r "nuget: StreamFlow.RabbitMq, 9.1.0"                
#r 指令可以在 F# Interactive 和 Polyglot Notebooks 中使用。将其复制到交互式工具或脚本的源代码中以引用包。
// Install StreamFlow.RabbitMq as a Cake Addin
#addin nuget:?package=StreamFlow.RabbitMq&version=9.1.0

// Install StreamFlow.RabbitMq as a Cake Tool
#tool nuget:?package=StreamFlow.RabbitMq&version=9.1.0                

StreamFlow - 另一个 RabbitMQ 处理程序

理由

我知道野外有很多酷的 RabbitMQ 包装器(MassTransit、EasyNetQ 以及更多),它们的工作效果远比这个库要好。然而,我从未能够不覆盖多个方法或类就使用它们。因此,我决定根据自己的需要制作自己的包装器。

通过 NuGet 安装

如果您想在项目中包含 HTTP 缸体,可以直接从 NuGet 安装它

要安装该包,请在包管理器控制台中运行以下命令

PM> Install-Package StreamFlow.RabbitMq

概念

在 RabbitMQ 服务器中,所有名称都可以通过实现 IRabbitMqConventions 并在依赖注入中注册来定义。但是默认行为如下

  • 交换名称使用请求类名称创建
  • 队列名称由 <request class name>:<request handler name>[:<service id>][:<consumer group name>] 创建
  • 错误队列名称使用 <request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error 创建
A consumer group is a group of consumers that share the same group id. 
When a topic is consumed by consumers in the same group, every record 
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the 
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have 
multiple services like customers and orders - they will have different 
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders". 

The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.

It can be also treated as a consumer group but in this case it applies to whole service.

组件生存期

连接

连接对象作为单例注册。但是消费者和发布者连接之间存在分离。这意味着单个服务实例可以最多连接到 RabbitMQ 服务器两次。

IPublisher

使用单例生命周期。内部使用容量为100,000项的Channel集合。如果代码中的发布速度过快,超出这个集合的处理速度,可能会接收到一个RabbitMqPublisherException异常,原因:InternalQueueIsFull。

在底层,发布者使用异步发布确认。这意味着每个PublishAsync实际上会(异步地)等待服务器确认。有关更详细的信息,请参阅RabbitMQ文档:[https://rabbitmq.cn/confirms.html#publisher-confirms](https://rabbitmq.cn/confirms.html#publisher-confirms)。这里还可以看到更多关于这种行为的行为示例:[https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html](https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html)。简而言之:如果您等待每个PublishAsync方法,实际上就像在这个示例中一样,您正在等待每个发布的确认。

public async Task PublishMessagesAsync<T>(T [] messages)
{
    foreach (var message in messages)
    {
      try
      {
          await _publisher.PublishAsync(message);
      }
      catch (Exception e)
      {
          // handle publication errors
      }
    }
}

上面提供的示例可能相对较慢,但它非常简单易实现。为了提高发布性能,可以采取类似的实现方法,如下所示

public async Task PublishMessagesAsync<T>(T [] messages)
{
    var tasks = new Task[messages.Length];
    for (var i = 0; i < messages.Length; ++i)
    {
        var message = messages[i];
        tasks[i] = _publisher.PublishAsync(message);
    }
    
    foreach (var task in tasks)
    {
        try
        {
            await task;
        }
        catch (Exception e)
        {
            // handle publication errors
        }
    }
}

注意:如果您没有指定取消令牌(或指定默认值),或者在发布选项中没有指定超时时间——库会自动添加60秒的超时时间。如果应用程序在此时间内未收到响应,则消息将被标记为已取消。

中间件

每个中间件都会作为瞬态添加。每个中间件将在每个接收或发布的消息上创建。

消费者

每个消费者都是单例的。但是,对于每个接收到的消息,都会创建一个自己的作用域,并在处理结束后进行释放。

处理程序

处理程序以范围实例的形式注册。

非常简单易用

定义一个消息

public class PingRequest
{
    public DateTime Timestamp { get; set; }
}

发布消息

public class PingController
{
    private readonly IPublisher _publisher;
    
    public PingController(IPublisher publisher)
    {
        _publisher = publisher;
    }
    
    public async Task SendPing()
    {
        await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
    }
}

定义消息消费者

public class PingRequestConsumer : IConsumer<PingRequest>
{
    public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
    {
        Console.WriteLine(message.Body.Timestamp);
        return Task.CompletedTask;
    }
}

使用ASP.NET Core进行配置

services.AddStreamFlow(transport =>
{
    transport
        .UsingRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .StartConsumerHostedService()
        )
        .Consumers(builder => builder
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr1"))
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr2"))
        )
        .ConfigureConsumerPipe(builder => builder
            .Use<LogAppIdMiddleware>()
        )
        .ConfigurePublisherPipe(builder => builder
            .Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
        );
});

上述代码注册了流流程类,配置了RabbitMQ连接,并指示启动配置的消费者。它配置了2个消费者组,每个组启动5个实例。

ConfigureConsumerPipe和ConfigurePublisherPipe注册了类似于中间件的动作,这些动作在消息接收或发布时执行。这些中间件的代码

// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly ILogger<LogAppIdMiddleware> _logger;

    public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
    {
        _logger = logger;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        if (!string.IsNullOrWhiteSpace(context.AppId))
        {
            _logger.LogInformation("AppId: {AppId}", context.AppId);
        }

        return next(context);
    }
}

// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly string _appId;

    public SetAppIdMiddleware(string appId)
    {
        _appId = appId;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        context.WithAppId(_appId);
        return next(context);
    }
}

尽管这些示例中间件非常简单,但在实际世界中,它可以实现非常强大的功能。

  • 重试
  • 异常处理
  • 日志记录
  • 度量
  • ...

MediatR

有一个MediatR库的实现,这可能会极大地简化消费者实现。

简单安装包

Install-Package StreamFlow.RabbitMq.MediatR

库期望MediatR本身已经配置并可在依赖注入容器中使用。

MediatR通知

请求类必须实现INotification接口,例如:

public class PingNotification : INotification
{
}

然后它可以在StreamFlow中使用

    ....
    .Consumers(builder => builder
        .AddNotification<PingNotification>()
    )
    ....

MediatR不带响应的请求

请求类必须实现IRequest接口,例如:

public class PingRequest : IRequest
{
}

然后它可以在StreamFlow中使用

    ....
    .Consumers(builder => builder
        .AddRequest<PingRequest>()
    )
    ....

MediatR带响应的请求

请求类必须实现IRequest接口,例如:

public class PongResponse
{
}

public class PingPongRequest : IRequest<PongResponse>
{
}

然后它可以在StreamFlow中使用

    ....
    .Consumers(builder => builder
        .AddRequest<PingPongRequest, PongResponse>()
    )
    ....

但是,在这种情况下,您会获得一个额外的行为:响应会通过RabbitMQ发布者总线发送回来。为了使这生效,您还需要启用发布者宿主(请参阅EnablePublisherHost调用)

services.AddStreamFlow(transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .EnableConsumerHost(consumer => consumer.Prefetch(5))
            .WithPrometheusMetrics()
            .WithPublisherOptions(publisher => publisher
                .EnablePublisherHost()
            )
        )
}    

度量

库支持各种度量,代表关于发布者和消费者洞察的各种信息。开箱即用的Prometheus度量实现,但它肯定不仅仅限于它。

为了启用度量,添加Prometheus包

PM> Install-Package StreamFlow.RabbitMq.Prometheus

并在配置RabbitMQ时启用度量(请参阅:WithPrometheusMetrics)

services.AddStreamFlow(streamFlowOptions, transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection(options.Host, options.Username, options.Password, options.VirtualHost)
            .WithPrometheusMetrics()
            ....
        );
});

Prometheus度量

  • streamflow_messages_published

    直方图,表示发布消息的数量和持续时间(秒)

    标签:exchange,state [完成或失败]

  • streamflow_bus_messages_published

    直方图,表示在使用发布者宿主时发布消息的数量和持续时间(秒)

    标签:exchange,state [完成或失败]

  • streamflow_messages_publishing_events

    直方图,表示在发布过程中发生的各种事件,如渠道创建、准备、序列化、事务提交等,有助于查看发布器进程是否存在任何瓶颈

    标签:交换、事件

  • streamflow_messages_publishing_errors

    计数器,表示在消息发布过程中发生的异常

    标签:交换

  • streamflow_bus_messages_publishing_errors

    计数器,表示在发布器主机使用过程中消息发布时发生的异常

    标签:交换

  • streamflow.messages.consumed

    直方图,消费消息计数和消费者持续时间(秒)

    标签:交换、队列、状态

  • streamflow.messages.consumed.errors

    计数器,表示在消费者过程中获得的异常

    标签:交换、队列

  • streamflow.messages.busPublishing

    计数器,显示使用了发布器主机发布的消息数量

  • streamflow.messages.busPublishing.errors

    计数器,表示通过发布器主机发布时的错误,因为发布器主机的通道是有限制的 - 当发布器主机接收的消息比实际能发送到RabbitMQ的消息更多时,它将开始增加

  • streamflow.publisher.pool_size

    计数器,使用发布器池时,显示池的大小(已创建但当前未使用的发布器)

  • streamflow.publisher.pool_in_use

    计数器,使用发布器池时,显示当前正在使用的发布器实例数量

错误处理

每个应用程序都处理错误。我很有信心 - RabbitMQ处理器可以进入多个异常情况。在不处理异常的情况下,会创建IRabbitMqErrorHandler实例,默认情况下会将消息发送到错误队列。尽管您可以覆盖IRabbitMqConventions接口来定义自己的错误队列名称,但默认行为只是简单地将":Error"后缀添加到消费者组队列的末尾,并将包含异常详细信息的消息发送到该队列。应用程序开发人员或支持人员可以稍后决定对这些消息进行何操作。

产品 兼容和额外的计算目标框架版本。
.NET net6.0 兼容。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 已计算。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。
兼容的目标框架
包含的目标框架(在包内)
了解更多关于 目标框架.NET Standard 的信息。

NuGet 包 (2)

显示依赖 StreamFlow.RabbitMq 的前 2 个 NuGet 包

下载
StreamFlow.RabbitMq.Prometheus

RabbitMQ 客户端包装器

StreamFlow.RabbitMq.MediatR

RabbitMQ 客户端包装器

GitHub 仓库

此包未在任何流行的 GitHub 仓库中使用。

版本 下载 最后更新
9.1.0 8,695 4/19/2023
9.0.3 659 3/30/2023
9.0.2 227 3/29/2023
9.0.1 249 3/29/2023
9.0.0 264 3/29/2023
8.0.0 1,231 2/18/2023
7.4.0 422 2/11/2023
7.3.1 2,804 11/2/2022
7.3.0 741 10/28/2022
7.2.1 623 10/25/2022
7.2.0 543 10/25/2022
7.1.4 1,001 10/11/2022
7.1.3 589 10/11/2022
7.1.2 597 10/11/2022
7.1.1 598 10/10/2022
7.1.0 642 10/9/2022
7.0.0 580 10/9/2022
6.8.1 3,554 5/20/2022
6.8.0 575 5/20/2022
6.7.2 884 2/24/2022
6.7.1 762 1/23/2022
6.7.0 750 1/17/2022
6.6.1 686 1/13/2022
6.6.0 762 1/12/2022
6.5.1 353 12/20/2021
6.5.0 348 12/20/2021
6.4.0 348 12/20/2021
6.3.4 317 12/14/2021
6.3.3 308 12/14/2021
6.3.2 311 12/14/2021
6.3.1 335 12/14/2021
6.3.0 386 12/14/2021
6.2.1 349 12/9/2021
6.2.0 324 12/9/2021
6.1.2 325 12/8/2021
6.1.1 349 12/8/2021
6.1.0 357 12/8/2021
6.0.0 616 12/6/2021
5.2.0 350 11/23/2021
5.1.0 364 11/23/2021
5.0.0 378 11/14/2021
4.1.0 518 10/14/2021
4.0.3 384 9/20/2021
4.0.2 439 9/19/2021
4.0.1 360 9/19/2021
4.0.0 379 9/19/2021
3.9.0 387 9/19/2021
3.8.0 494 9/12/2021
3.7.0 445 9/12/2021
3.5.0 378 9/7/2021
3.4.0 370 9/7/2021
3.3.0 306 9/7/2021
3.2.0 298 9/7/2021
3.1.0 314 9/2/2021
3.0.0 304 9/1/2021
2.5.0 466 5/4/2021
2.3.0 289 5/4/2021
2.2.0 326 4/27/2021
2.1.0 325 4/26/2021
2.0.0 291 4/26/2021