StreamFlow 9.1.0

dotnet add package StreamFlow --version 9.1.0                
NuGet\Install-Package StreamFlow -Version 9.1.0                
此命令旨在在 Visual Studio 的包管理器控制台中使用,因为它使用 NuGet 模块的 Install-Package 版本。
<PackageReference Include="StreamFlow" Version="9.1.0" />                
对于支持 PackageReference 的项目,将此 XML 节点复制到项目文件中以引用此包。
paket add StreamFlow --version 9.1.0                
#r "nuget: StreamFlow, 9.1.0"                
#r 指令可用于 F# Interactive 和 Polyglot Notebooks。将此内容复制到交互式工具或脚本的源代码中,以引用包。
// Install StreamFlow as a Cake Addin
#addin nuget:?package=StreamFlow&version=9.1.0

// Install StreamFlow as a Cake Tool
#tool nuget:?package=StreamFlow&version=9.1.0                

StreamFlow - 另一个 RabbitMQ 处理程序

理由

我知道野外有多款出色的 RabbitMQ 包装器(MassTransit、EasyNetQ 以及更多),它们比这个库做得更好。然而,我从未 able to 使用它们而不需要覆盖多个方法或类。所以我决定根据自己的需要制作一个自己的包装器。

通过 NuGet 安装

如果您想将 HTTP 沉降至项目中,您可以直接从 NuGet 安装它。

要安装包,请在包管理器控制台中运行以下命令

PM> Install-Package StreamFlow.RabbitMq

概念

RabbitMQ 服务器中的所有名称都可以通过实现 IRabbitMqConventions 并注册在依赖注入中来实现。但是默认行为如下

  • 交换机名称使用请求类名称创建
  • 队列名称使用 <request class name>:<request handler name>[:<service id>][:<consumer group name>] 创建
  • 错误队列名称使用 <request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error 创建
A consumer group is a group of consumers that share the same group id. 
When a topic is consumed by consumers in the same group, every record 
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the 
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have 
multiple services like customers and orders - they will have different 
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders". 

The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.

It can be also treated as a consumer group but in this case it applies to whole service.

组件生命周期

连接

连接对象注册为单例。但是消费者和发布者连接之间存在分离。这意味着单个服务实例可以连接到 RabbitMQ 服务器两次的最大。

IPublisher

使用单例生命周期。内部它使用具有 100_000 项固定大小的 Channel 集合。如果代码中的发布操作发生得比此集合处理得快得多,则您可能会收到具有原因:“InternalQueueIsFull”的 RabbitMqPublisherException。

在内部,发布者使用异步发布确认。这意味着每个 PublishAsync 实际上是(异步)等待服务器确认。有关更详细的说明,请参阅 RabbitMQ 文档:https://rabbitmq.cn/confirms.html#publisher-confirms 在这里,您还可以看到更多关于该行为的行为示例:https://rabbitmq.cn/tutorials/tutorial-seven-dotnet.html 总结一下:如果对每个 PublishAsync 方法进行等待,实际上是等待每种发布的确认,如本例所示

public async Task PublishMessagesAsync<T>(T [] messages)
{
    foreach (var message in messages)
    {
      try
      {
          await _publisher.PublishAsync(message);
      }
      catch (Exception e)
      {
          // handle publication errors
      }
    }
}

上面的例子可能相对较慢,但是实现起来非常简单。为了改善发布性能,您可以采取与本例类似的方案

public async Task PublishMessagesAsync<T>(T [] messages)
{
    var tasks = new Task[messages.Length];
    for (var i = 0; i < messages.Length; ++i)
    {
        var message = messages[i];
        tasks[i] = _publisher.PublishAsync(message);
    }
    
    foreach (var task in tasks)
    {
        try
        {
            await task;
        }
        catch (Exception e)
        {
            // handle publication errors
        }
    }
}

注意:如果在发布选项中不指定取消令牌(或指定默认值),或者没有指定超时,库将自动添加60秒超时。如果在此期间应用程序没有收到响应,则消息将标记为已取消。

中间件

每个中间件都作为瞬态添加。每个中间件都会在处理每个接收或发布的消息时创建。

消费者

每个消费者都是单例。但是,对于每个接收到的消息都会创建一个自己的作用域,并在处理结束后进行清理。

处理器

处理器以作用域实例的形式注册。

非常简单易用

定义消息

public class PingRequest
{
    public DateTime Timestamp { get; set; }
}

发布消息

public class PingController
{
    private readonly IPublisher _publisher;
    
    public PingController(IPublisher publisher)
    {
        _publisher = publisher;
    }
    
    public async Task SendPing()
    {
        await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
    }
}

定义消息消费者

public class PingRequestConsumer : IConsumer<PingRequest>
{
    public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
    {
        Console.WriteLine(message.Body.Timestamp);
        return Task.CompletedTask;
    }
}

使用 ASP.NET Core 进行配置

services.AddStreamFlow(transport =>
{
    transport
        .UsingRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .StartConsumerHostedService()
        )
        .Consumers(builder => builder
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr1"))
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr2"))
        )
        .ConfigureConsumerPipe(builder => builder
            .Use<LogAppIdMiddleware>()
        )
        .ConfigurePublisherPipe(builder => builder
            .Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
        );
});

上面的代码注册了流流程类,配置了 RabbitMQ 连接,并指示启动配置的 ASP.NET Core 托管服务,该服务启动了配置的消费者。它配置了 2 个消费者组,每个组启动 5 个实例。

ConfigureConsumerPipe 和 ConfigurePublisherPipe 注册了类似于中间件的操作,当接收到消息或发布时执行。这些中间件的代码

// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly ILogger<LogAppIdMiddleware> _logger;

    public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
    {
        _logger = logger;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        if (!string.IsNullOrWhiteSpace(context.AppId))
        {
            _logger.LogInformation("AppId: {AppId}", context.AppId);
        }

        return next(context);
    }
}

// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly string _appId;

    public SetAppIdMiddleware(string appId)
    {
        _appId = appId;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        context.WithAppId(_appId);
        return next(context);
    }
}

尽管这些中间件示例非常简单,但在实际情况下可以创建非常强大的实现

  • 重试
  • 异常处理
  • 日志记录
  • 度量
  • ...

MediatR

有一个用于 MediatR 库的实现,这可能会极大地简化消费者实现。

简单地安装包

Install-Package StreamFlow.RabbitMq.MediatR

库期望 MediatR 已经在依赖注入容器中配置并可用。

MediatR 通知

请求类必须实现 INotification 接口,例如

public class PingNotification : INotification
{
}

然后它可以在 StreamFlow 中使用

    ....
    .Consumers(builder => builder
        .AddNotification<PingNotification>()
    )
    ....

MediatR 请求(无响应)

请求类必须实现 IRequest 接口,例如

public class PingRequest : IRequest
{
}

然后它可以在 StreamFlow 中使用

    ....
    .Consumers(builder => builder
        .AddRequest<PingRequest>()
    )
    ....

MediatR 请求(带响应)

请求类必须实现 IRequest 接口,例如

public class PongResponse
{
}

public class PingPongRequest : IRequest<PongResponse>
{
}

然后它可以在 StreamFlow 中使用

    ....
    .Consumers(builder => builder
        .AddRequest<PingPongRequest, PongResponse>()
    )
    ....

然而,在这种情况下,您将获得额外的行为:响应将通过 RabbitMQ 发布者总线发送回。为了使其工作,您还需要启用发布主机(请参阅 EnablePublisherHost 调用)

services.AddStreamFlow(transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .EnableConsumerHost(consumer => consumer.Prefetch(5))
            .WithPrometheusMetrics()
            .WithPublisherOptions(publisher => publisher
                .EnablePublisherHost()
            )
        )
}    

度量

库支持各种度量,代表着关于发布者和消费者的各种信息。默认情况下,它提供了 Prometheus 度量的实现,但它当然不仅限于它。

为了启用度量,请添加 Prometheus 包

PM> Install-Package StreamFlow.RabbitMq.Prometheus

并在配置 RabbitMQ 时启用度量(请参阅:WithPrometheusMetrics)

services.AddStreamFlow(streamFlowOptions, transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection(options.Host, options.Username, options.Password, options.VirtualHost)
            .WithPrometheusMetrics()
            ....
        );
});

Prometheus 度量

  • streamflow_messages_published

    直方图,表示发布消息的数量和持续时间(秒)

    标签:exchange,状态 [完成或失败]

  • streamflow_bus_messages_published

    直方图,表示使用发布者主机时发布消息的数量和持续时间(秒)

    标签:exchange,状态 [完成或失败]

  • streamflow_messages_publishing_events

    直方图,表示发布过程中发生的各种事件,如通道创建、准备、序列化、事务提交等,有助于查看发布过程中是否存在任何瓶颈

    标签:exchange,事件

  • streamflow_messages_publishing_errors

    计数器,表示消息发布过程中发生的异常

    标签:exchange

  • streamflow_bus_messages_publishing_errors

    计数器,表示使用发布者主机时消息发布过程中发生的异常

    标签:exchange

  • streamflow_messages_consumed

    直方图,表示消费消息的数量和消费者持续时间(秒)

    标签:交换,队列,状态

  • 流流量消息消费错误

    计数器,表示消费过程中遇到的异常

    标签:交换,队列

  • 流流量消息总线发布

    计数器,显示使用发布者主机发布的消息数量

  • 流流量消息总线发布错误

    计数器,表示通过发布者主机发布时的错误,由于发布者主机通道是有限的——当发布者主机接收的消息多于其能实际发送到RabbitMQ的消息时,该计数器将开始增加

  • 流流量发布者池大小

    计数器,在启用发布者池时,显示池的大小(已创建但当前未使用的发布者)

  • 流流量发布者池当前使用中

    计数器,在启用发布者池时,显示当前有多少发布者实例正在使用

错误处理

每个应用程序都要处理错误。我相当确定——RabbitMQ处理程序可能会遇到多个异常情况。在出现未处理的异常时,将创建IRabbitMqErrorHandler实例,这将默认将消息发送到错误队列。尽管您可以通过覆盖IRabbitMqConventions接口来定义自己的错误队列名称,但默认行为只是在消费者组队列的末尾添加“:错误”后缀,并将带有异常详细信息的消息发送到该队列。应用程序开发人员或支持人员可以稍后决定对这些消息做什么。

产品 兼容的和额外计算的目标框架版本。
.NET net6.0是兼容的。 net6.0-android已被计算。 net6.0-ios已被计算。 net6.0-maccatalyst已被计算。 net6.0-macos已被计算。 net6.0-tvos已被计算。 net6.0-windows已被计算。 net7.0已被计算。 net7.0-android已被计算。 net7.0-ios已被计算。 net7.0-maccatalyst已被计算。 net7.0-macos已被计算。 net7.0-tvos已被计算。 net7.0-windows已被计算。 net8.0已被计算。 net8.0-android已被计算。 net8.0-browser已被计算。 net8.0-ios已被计算。 net8.0-maccatalyst已被计算。 net8.0-macos已被计算。 net8.0-tvos已被计算。 net8.0-windows已被计算。
兼容的目标框架
包含的目标框架(在包中)
了解有关目标框架.NET Standard的更多信息。
  • net6.0

    • 无依赖项。

NuGet包 (2)

显示依赖于StreamFlow的前两个NuGet包

下载
StreamFlow.RabbitMq

RabbitMQ客户端包装器

StreamFlow.Outbox.EntityFrameworkCore

RabbitMQ客户端包装器

GitHub仓库

此包不被任何流行的GitHub仓库使用。

版本 下载 最后更新
9.1.0 8,648 4/19/2023
9.0.3 688 3/30/2023
9.0.2 273 3/29/2023
9.0.1 270 3/29/2023
9.0.0 284 3/29/2023
8.0.0 1,271 2/18/2023
7.4.0 484 2/11/2023
7.3.1 2,870 11/2/2022
7.3.0 812 10/28/2022
7.2.1 720 10/25/2022
7.2.0 651 10/25/2022
7.1.4 1,124 10/11/2022
7.1.3 696 10/11/2022
7.1.2 703 10/11/2022
7.1.1 694 10/10/2022
7.1.0 763 10/9/2022
7.0.0 663 10/9/2022
6.8.1 3,628 5/20/2022
6.8.0 693 5/20/2022
6.7.2 1,053 2/24/2022
6.7.1 929 1/23/2022
6.7.0 900 1/17/2022
6.6.1 807 1/13/2022
6.6.0 900 1/12/2022
6.5.1 404 12/20/2021
6.5.0 379 12/20/2021
6.4.0 385 12/20/2021
6.3.4 381 12/14/2021
6.3.3 378 12/14/2021
6.3.2 381 12/14/2021
6.3.1 392 12/14/2021
6.3.0 459 12/14/2021
6.2.1 406 12/9/2021
6.2.0 385 12/9/2021
6.1.2 384 12/8/2021
6.1.1 400 12/8/2021
6.1.0 412 12/8/2021
6.0.0 653 12/6/2021
5.2.0 670 11/23/2021
5.1.0 655 11/23/2021
5.0.0 420 11/14/2021
4.1.0 912 10/14/2021
4.0.3 805 9/20/2021
4.0.2 832 9/19/2021
4.0.1 769 9/19/2021
4.0.0 780 9/19/2021
3.9.0 834 9/19/2021
3.8.0 894 9/12/2021
3.7.0 866 9/12/2021
3.5.0 772 9/7/2021
3.4.0 769 9/7/2021
3.3.0 735 9/7/2021
3.2.0 735 9/7/2021
3.1.0 721 9/2/2021
3.0.0 369 9/1/2021
2.5.0 535 5/4/2021
2.3.0 355 5/4/2021
2.2.0 396 4/27/2021
2.1.0 385 4/26/2021
2.0.0 365 4/26/2021