Batching on duration or threshold using TPL Dataflow(使用TPL数据流对持续时间或阈值进行批处理)
问题描述
我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。
TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发?
例如,一旦消息从队列中拉出,当前实现将发布该消息。
postedSuccessfully = targetBuffer.Post(msg.Value);
推荐答案
已经可以通过System.Reactive特别是Buffer运算符按计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需计数或其时间跨度过期。
数据流数据块旨在与System.Reactive配合使用。使用DataflowBlock.AsObservable()和AsObserver()扩展方法将can be converted块传递给可观测对象和观察者。这使得构建缓冲挡路变得非常容易:
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
此方法使用两个缓冲块来缓冲输入和输出。Buffer()
在批处理已满或时间跨度过期时,从输入挡路(可观察对象)读取并写入输出挡路(观察者)。
默认情况下,Rx在当前线程上工作。通过调用ObserveOn(TaskPoolScheduler.Default)
,我们告诉它在任务池线程上处理数据。
示例
此代码创建5个项目或1秒的挡路缓冲区。它从发布7个项目开始,等待1.1秒,然后发布另外7个项目。每批都与线程ID一起写入控制台:
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
输出为:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
这篇关于使用TPL数据流对持续时间或阈值进行批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用TPL数据流对持续时间或阈值进行批处理


- C# 中多线程网络服务器的模式 2022-01-01
- 带有服务/守护程序应用程序的 Microsoft Graph CSharp SDK 和 OneDrive for Business - 配额方面返回 null 2022-01-01
- 良好实践:如何重用 .csproj 和 .sln 文件来为 CI 创建 2022-01-01
- 如何用自己压缩一个 IEnumerable 2022-01-01
- C#MongoDB使用Builders查找派生对象 2022-09-04
- WebMatrix WebSecurity PasswordSalt 2022-01-01
- 在哪里可以找到使用中的C#/XML文档注释的好例子? 2022-01-01
- MoreLinq maxBy vs LINQ max + where 2022-01-01
- 输入按键事件处理程序 2022-01-01
- Web Api 中的 Swagger .netcore 3.1,使用 swagger UI 设置日期时间格式 2022-01-01