6

我使用 BufferBlock 和 ActionBlock 设置了生产者/消费者数据流块,它在控制台应用程序中运行良好;

将所有项目添加到 BurfferBlock 并将 BufferBlock 与其他 Action Items 链接后;它运行良好。

现在我想使用该内部服务,该数据流块管道将始终处于启动状态,并且当消息将通过外部事件可用时,它将进入缓冲区块并开始处理。我怎样才能做到这一点?

到目前为止,我已经完成了以下工作:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}
4

2 回答 2

2

您的批量大小由批处理块构造函数中的变量 'BoundingCapacity' 定义。批次将在以下情况下发布:

  • 已收到与批量大小相等的帖子数(在构造函数中指定)
  • 批处理块被标记为完成
  • 调用 triggerbatch 方法

似乎您希望在满足浴池大小或发生超时时发布一批。如果是这种情况,并且批量大小不重要,我真的会为您拥有的计时器添加一个循环间隔,并使批处理块下游的对象忽略空帖子。

您可能真正想要的,也是最符合数据流编程理念的,是在您开始发布一系列项目时创建一个新的批处理块,然后在完成或发生超时时完成它。如果新帖子尚不存在,则新帖子将创建一个新的批处理块。

尝试在仅基于第一个触发器触发的批处理块周围实现超时计时器的问题是,您要么需要计数并验证发送到缓冲区块的帖子,要么需要查看来自缓冲区块的帖子。这两种情况都会造成很多丑陋和/或违反块封装。

于 2013-12-16T19:45:24.120 回答
2

总体而言,DataFlow 是一种使用一组方法处理一堆对象的方法。它不提供或期望创建这些对象的任何特定方式。

如果您希望管道保持活动状态,请不要终止应用程序。如果您不想使用控制台应用程序,请创建一个服务来构建管道并将对象发送给它,直到它关闭。

消息只是您将通过读取数据、响应事件(无论这意味着什么)或任何其他方式创建的对象。

至于外部事件,你是什么意思?有人会将数据发送到您的应用程序吗?发生这种情况的方式有很多:

  • 如果数据来自另一个控制台应用程序,您可以将一个应用程序的结果通过管道传输到另一个应用程序,解析来自命令行应用程序输入流的数据,创建消息并将它们传递到管道
  • 如果您想要一个服务监听请求,您可以托管一个 .NET Pipe、WCF 或 Web API 服务来监听调用并将发布的数据传递到管道。
  • 如果数据来自数据库,您可以轮询更改并将任何更改的数据发送到管道。

关键是,Dataflow 是关于处理数据的,而不是关于监听事件的。它不是一个成熟的分布式代理系统,如果那是您正在寻找的。

于 2013-12-03T10:58:47.420 回答