Foundatio.Kafka 10.7.1
前缀已保留
dotnet add package Foundatio.Kafka --version 10.7.1
NuGet\Install-Package Foundatio.Kafka -Version 10.7.1
<PackageReference Include="Foundatio.Kafka" Version="10.7.1" />
paket add Foundatio.Kafka --version 10.7.1
#r "nuget: Foundatio.Kafka, 10.7.1"
// Install Foundatio.Kafka as a Cake Addin #addin nuget:?package=Foundatio.Kafka&version=10.7.1 // Install Foundatio.Kafka as a Cake Tool #tool nuget:?package=Foundatio.Kafka&version=10.7.1
构建松散耦合分布式应用程序的可插入基础块。
包括 Redis、Azure、AWS、RabbitMQ、Kafka 和内存中的实现(用于开发)。
为什么使用 Foundatio?
在构建多个大型云应用程序时,我们发现缺乏伟大的解决方案(不是要说没有解决方案),这些解决方案能够以简单的开发体验构建可扩展的分布式应用程序,同时许多关键组件。以下是为什么我们构建并使用 Foundatio 的几个例子
- 希望对抽象接口进行构建,以便我们能够轻松地更改实现。
- 希望这些块能够友好地使用依赖注入。
- 缓存:我们最初使用一个开源 Redis 缓存客户端,但后来它变成了一种具有高许可成本的商业产品。不仅如此,还没有内存中的实现,因此每个开发人员都需要设置和配置 Redis。
- 消息总线:我们最初考虑过NServiceBus(一个好产品),但它有高昂的许可费用(他们需要吃饭),而且不友好开源软件。我们还考虑了MassTransit(另一个好产品),但当时发现Azure支持不足,并且本地设置很麻烦(对于内存中)。我们想要一个简单的消息总线,只需要在本地或云中运行。
- 存储:我们没有找到任何现有项目支持解耦并且在内存、文件存储或Azure Blob存储中。
总的来说,如果您想要无痛苦的开发和测试,同时允许您的应用程序扩展,请使用Foundatio!
实现
- Redis - 缓存、存储、队列、消息、锁、指标
- Azure Storage - 存储、队列
- Azure ServiceBus - 队列、消息
- AWS - 存储、队列、指标
- Kafka - 消息
- RabbitMQ - 消息
- Minio - 存储
- Aliyun - 存储
- SshNet - 存储
入门(开发)
Foundatio可以通过NuGet包管理器安装,链接在此安装Foundatio。如果您需要帮助,请创建一个问题或者加入我们的Discord聊天室。如果您有任何问题,我们将一直在这里帮忙!
本节仅用于开发目的!如果您正在尝试使用Foundatio库,请从NuGet获取它们。
- 您必须安装Visual Studio Code。
- 打开
Foundatio.sln
Visual Studio解决方案文件。
使用Foundatio
以下各节包含Foundatio能实现的一小部分功能。我们建议您查看源代码以获取更多信息。如果您有任何问题或需要帮助,请告诉我们!
缓存
缓存让您能够以闪电般的速度存储和访问数据,从而节省您创建或获取数据的昂贵操作。我们提供了四种不同的缓存实现,它们基于接口
InMemoryCacheClient
:一个内存缓存客户端实现。这个缓存实现仅在进程的生命周期内有效。值得注意的是,内存缓存客户端具有通过MaxItems
属性缓存最后X项的能力。我们使用这个在Exceptionless中,只保存最后250个解析的geoip结果。HybridCacheClient
:这个缓存实现同时使用和
InMemoryCacheClient
,并使用IMessageBus
在进程间同步缓存。如果项存在于本地缓存中,这可以带来巨大的性能提升,因为您可以节省一个序列化操作和对远程缓存的调用。RedisCacheClient
:Redis缓存客户端实现。- RedisHybridCacheClient:一个实现了
HybridCacheClient
的类,使用RedisCacheClient
作为ICacheClient
,并使用RedisMessageBus
作为IMessageBus
。 - ScopedCacheClient:此缓存实现接受一个
ICacheClient
的实例和一个字符串scope
。此作用域将作为前缀添加到每个缓存键上。这使得它可以很容易地对所有缓存键进行范围划分并且轻松删除。
示例
using Foundatio.Caching;
ICacheClient cache = new InMemoryCacheClient();
await cache.SetAsync("test", 1);
var value = await cache.GetAsync<int>("test");
队列
列队提供先进先出(FIFO)消息发送。我们提供了四个不同类型的列队实现,这些实现源于IQueue
接口
- InMemoryQueue:内存队列实现。此队列实现仅对进程的生命周期有效。
- RedisQueue:Redis队列实现。
- AzureServiceBusQueue:Azure Service Bus 队列实现。
- AzureStorageQueue:Azure Storage 队列实现。
- SQSQueue:AWS SQS实现。
示例
using Foundatio.Queues;
IQueue<SimpleWorkItem> queue = new InMemoryQueue<SimpleWorkItem>();
await queue.EnqueueAsync(new SimpleWorkItem {
Data = "Hello"
});
var workItem = await queue.DequeueAsync();
锁
锁可以确保资源在任何给定时间只被一个消费者访问。我们提供了两种不同的锁实现,这些实现源于ILockProvider
接口
- CacheLockProvider:使用缓存进行进程间通信的锁实现。
- ThrottlingLockProvider:仅允许一定数量的锁通过的锁实现。您可以将其用于限制对某些外部服务的API调用的数量,它将在所有请求该锁的过程中进行限制。
- ScopedLockProvider:此锁实现接受一个
ILockProvider
的实例和一个字符串scope
。此作用域将作为前缀添加到每个锁键上。这使得它可以很容易地进行范围划分和释放。
值得注意的是,所有锁提供者都使用了ICacheClient
。这可以让你确保你的代码可以在多台机器上正确加锁。
示例
using Foundatio.Lock;
ILockProvider locker = new CacheLockProvider(new InMemoryCacheClient(), new InMemoryMessageBus());
var testLock = await locker.AcquireAsync("test");
// ...
await testLock.ReleaseAsync();
ILockProvider throttledLocker = new ThrottlingLockProvider(new InMemoryCacheClient(), 1, TimeSpan.FromMinutes(1));
var throttledLock = await throttledLocker.AcquireAsync("test");
// ...
await throttledLock.ReleaseAsync();
消息传递
允许你在应用程序流通过的消息上发布和订阅。我们提供了四种不同的消息总线实现,这些实现源于IMessageBus
接口
- InMemoryMessageBus:内存消息总线实现。此消息总线实现仅对进程的生命周期有效。
- RedisMessageBus:Redis消息总线实现。
- RabbitMQMessageBus:RabbitMQ实现。
- KafkaMessageBus:Kafka实现。
- AzureServiceBusMessageBus:Azure Service Bus实现。
示例
using Foundatio.Messaging;
IMessageBus messageBus = new InMemoryMessageBus();
await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
// Got message
});
await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" });
作业
允许你运行长期运行的过程(进程内或进程外)而不用担心其被过早终止。根据你的用例,我们有三种不同的方式来定义工作方式。
- 工作:所有工作必须从
IJob
接口 继承。我们还提供了一个JobBase
基类,您可以从它继承,它提供了作业上下文和日志记录。您可以通过在作业上调用RunAsync()
或创建一个JobRunner
类 的实例并调用其中的一个运行方法来运行作业。JobRunner 可以用来轻松地将您的作业作为 Azure Web Jobs 运行。
示例
using Foundatio.Jobs;
public class HelloWorldJob : JobBase {
public int RunCount { get; set; }
protected override Task<JobResult> RunInternalAsync(JobContext context) {
RunCount++;
return Task.FromResult(JobResult.Success);
}
}
var job = new HelloWorldJob();
await job.RunAsync(); // job.RunCount = 1;
await job.RunContinuousAsync(iterationLimit: 2); // job.RunCount = 3;
await job.RunContinuousAsync(cancellationToken: new CancellationTokenSource(10).Token); // job.RunCount > 10;
- 队列处理器工作:队列处理器作业非常适合处理将由队列数据驱动的作业。队列处理器工作必须从
QueueJobBase<T>
类 继承。然后您可以通过在作业上调用RunAsync()
或将其传递到JobRunner
类 来运行作业。JobRunner 可以用来轻松地将您的作业作为 Azure Web Jobs 运行。
示例
using Foundatio.Jobs;
public class HelloWorldQueueJob : QueueJobBase<HelloWorldQueueItem> {
public int RunCount { get; set; }
public HelloWorldQueueJob(IQueue<HelloWorldQueueItem> queue) : base(queue) {}
protected override Task<JobResult> ProcessQueueEntryAsync(QueueEntryContext<HelloWorldQueueItem> context) {
RunCount++;
return Task.FromResult(JobResult.Success);
}
}
public class HelloWorldQueueItem {
public string Message { get; set; }
}
// Register the queue for HelloWorldQueueItem.
container.AddSingleton<IQueue<HelloWorldQueueItem>>(s => new InMemoryQueue<HelloWorldQueueItem>());
// To trigger the job we need to queue the HelloWorldWorkItem message.
// This assumes that we injected an instance of IQueue<HelloWorldWorkItem> queue
IJob job = new HelloWorldQueueJob();
await job.RunAsync(); // job.RunCount = 0; The RunCount wasn't incremented because we didn't enqueue any data.
await queue.EnqueueAsync(new HelloWorldWorkItem { Message = "Hello World" });
await job.RunAsync(); // job.RunCount = 1;
await queue.EnqueueAsync(new HelloWorldWorkItem { Message = "Hello World" });
await queue.EnqueueAsync(new HelloWorldWorkItem { Message = "Hello World" });
await job.RunUntilEmptyAsync(); // job.RunCount = 3;
- 工作项工作:工作项作业将在工作池中与其它工作项作业一起运行。这种类型的工作非常适合不常发生但应该在工作项中完成的事物(例如:删除具有多个子实体的实体)。当您在
消息总线
上发布消息时,将触发该作业。作业必须从WorkItemHandlerBase
类 继承。然后您可以通过JobRunner
类 运行所有共享作业。JobRunner 可以用来轻松地将您的作业作为 Azure Web Jobs 运行。
示例
using System.Threading.Tasks;
using Foundatio.Jobs;
public class HelloWorldWorkItemHandler : WorkItemHandlerBase {
public override async Task HandleItemAsync(WorkItemContext ctx) {
var workItem = ctx.GetData<HelloWorldWorkItem>();
// We can report the progress over the message bus easily.
// To receive these messages just inject IMessageSubscriber
// and Subscribe to messages of type WorkItemStatus
await ctx.ReportProgressAsync(0, "Starting Hello World Job");
await Task.Delay(TimeSpan.FromSeconds(2.5));
await ctx.ReportProgressAsync(50, "Reading value");
await Task.Delay(TimeSpan.FromSeconds(.5));
await ctx.ReportProgressAsync(70, "Reading value");
await Task.Delay(TimeSpan.FromSeconds(.5));
await ctx.ReportProgressAsync(90, "Reading value.");
await Task.Delay(TimeSpan.FromSeconds(.5));
await ctx.ReportProgressAsync(100, workItem.Message);
}
}
public class HelloWorldWorkItem {
public string Message { get; set; }
}
// Register the shared job.
var handlers = new WorkItemHandlers();
handlers.Register<HelloWorldWorkItem, HelloWorldWorkItemHandler>();
// Register the handlers with dependency injection.
container.AddSingleton(handlers);
// Register the queue for WorkItemData.
container.AddSingleton<IQueue<WorkItemData>>(s => new InMemoryQueue<WorkItemData>());
// The job runner will automatically look for and run all registered WorkItemHandlers.
new JobRunner(container.GetRequiredService<WorkItemJob>(), instanceCount: 2).RunInBackground();
// To trigger the job we need to queue the HelloWorldWorkItem message.
// This assumes that we injected an instance of IQueue<WorkItemData> queue
// NOTE: You may have noticed that HelloWorldWorkItem doesn't derive from WorkItemData.
// Foundatio has an extension method that takes the model you post and serializes it to the
// WorkItemData.Data property.
await queue.EnqueueAsync(new HelloWorldWorkItem { Message = "Hello World" });
文件存储
我们提供了不同的文件存储实现,它们都从 IFileStorage
接口 继承。
- InMemoryFileStorage:内存中文件实现。此文件存储实现仅在进程的生命周期内有效。
- FolderFileStorage:使用硬盘进行存储的文件存储实现。
- AzureFileStorage:Azure Blob 存储实现。
- S3FileStorage:AWS S3 文件存储实现。
- RedisFileStorage:Redis 文件存储实现。
- MinioFileStorage:Minio 文件存储实现。
- AliyunFileStorage:Aliyun 文件存储实现。
- SshNetFileStorage:SFTP 文件存储实现。
我们建议将所有 IFileStorage
实现作为单例使用。
示例
using Foundatio.Storage;
IFileStorage storage = new InMemoryFileStorage();
await storage.SaveFileAsync("test.txt", "test");
string content = await storage.GetFileContentsAsync("test.txt")
指标
我们提供了五个从 IMetricsClient
接口 继承的实现。
- InMemoryMetricsClient:内存中指标实现。
- RedisMetricsClient:一个Redis指标实现。
- StatsDMetricsClient:一个statsd指标实现。
- MetricsNETClient:一个Metrics.NET实现。
- AppMetricsClient:一个AppMetrics实现。
- CloudWatchMetricsClient:一个AWS CloudWatch实现。
我们建议使用所有的IMetricsClient
实现作为单例。
示例
IMetricsClient metrics = new InMemoryMetricsClient();
metrics.Counter("c1");
metrics.Gauge("g1", 2.534);
metrics.Timer("t1", 50788);
示例应用
我们提供幻灯片和示例应用,展示了如何使用Foundatio。
感谢所有做出贡献的人们
产品 | 版本 兼容的和额外的计算目标框架版本。 |
---|---|
.NET | net5.0 已计算。 net5.0-windows 已计算。 net6.0 已计算。 net6.0-android 已计算。 net6.0-ios 已计算。 net6.0-maccatalyst 已计算。 net6.0-macos 已计算。 net6.0-tvos 已计算。 net6.0-windows 已计算。 net7.0 已计算。 net7.0-android 已计算。 net7.0-ios 已计算。 net7.0-maccatalyst 已计算。 net7.0-macos 已计算。 net7.0-tvos 已计算。 net7.0-windows 已计算。 net8.0 已计算。 net8.0-android 已计算。 net8.0-browser 已计算。 net8.0-ios 已计算。 net8.0-maccatalyst 已计算。 net8.0-macos 已计算。 net8.0-tvos 已计算。 net8.0-windows 已计算。 |
.NET Core | netcoreapp2.0 已计算。 netcoreapp2.1 已计算。 netcoreapp2.2 已计算。 netcoreapp3.0 已计算。 netcoreapp3.1 已计算。 |
.NET Standard | netstandard2.0 兼容。 netstandard2.1 已计算。 |
.NET Framework | net461 已计算。 net462 已计算。 net463 已计算。 net47 已计算。 net471 已计算。 net472 已计算。 net48 已计算。 net481 已计算。 |
MonoAndroid | monoandroid 已计算。 |
MonoMac | monomac 已计算。 |
MonoTouch | monotouch 已计算。 |
Tizen | tizen40 已计算。 tizen60 已计算。 |
Xamarin.iOS | xamarinios 已计算。 |
Xamarin.Mac | xamarinmac 已计算。 |
Xamarin.TVOS | xamarintvos 已计算。 |
Xamarin.WatchOS | xamarinwatchos 已计算。 |
-
.NETStandard 2.0
- Confluent.Kafka (>= 2.3.0)
- Foundatio (>= 10.7.1)
NuGet 包
此包没有被任何 NuGet 包使用。
GitHub 仓库
此包没有被任何流行的 GitHub 仓库使用。
版本 | 下载 | 最后更新 |
---|---|---|
10.7.1 | 510 | 3/27/2024 |
10.7.0 | 417 | 1/5/2024 |
10.6.1 | 744 | 6/23/2023 |
10.6.0 | 783 | 1/1/2023 |
10.5.1-beta1 | 126 | 6/9/2022 |