1

我需要设计自定义数据流块,它的作用类似于缓冲区,但会在超时后使项目可用。我会将传入的消息放入队列并启动计时器。当计时器被触发时,我会将一个项目从队列移动到 BufferBlock 中,这将使其可供 Dataflow 消费者使用。

如果我将一个项目从内部队列移动到定时器处理程序的输出 BufferBlock 中,显然它不会是线程安全的,因为定时器处理程序可能与入队调用和损坏的队列发生冲突。MSDN 中声称数据流是基于 Actor 思想的,它假设消息是单线程执行的,从而解决了同步问题。但是如果我引入一个计时器处理程序,这将打破这个假设。我可以在队列上使用老派锁,或者使用 CuncurrentQueue 但我很好奇是否有更多特殊的数据流方式来管理计时器,以便它不会与数据流块的 Post() 调用冲突。

或者扩展这个问题,是否有一种优雅的方式让数据流块处理几种不同类型的消息并且仍然提供线程安全模型?

4

2 回答 2

2

我需要设计自定义数据流块,它的作用类似于缓冲区,但会在超时后使项目可用。

那么,像这样的事情?

public static TransformBlock<T, T> Delay<T>(Timespan delay)
{
  return new TransformBlock<T, T>(async x =>
  {
    await Task.Delay(delay);
    return x;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
}

如果您仍然认为需要自定义块,请务必阅读实现自定义数据流块的指南,其中描述了您需要担心的所有锁。

于 2013-10-25T13:32:54.747 回答
1

您引用的 MSDN 页面

因为运行时管理数据之间的依赖关系,您通常可以避免同步访问共享数据的要求。

这意味着当您在代码中使用数据流块时,您通常不必担心同步,因为这些块会为您完成这些

但是当您编写自定义数据流块时,您确实需要自己处理同步。例如,假设您正在实施BufferBlock. 调用Post()该块必须以某种方式同步,因为两个源块可以Post()同时调用。没有人会为你处理同步,所以你的Post()* 实现需要使用锁或ConcurrentQueue类似的东西。

* 实际上,你不执行Post(),你执行OfferMessage()

但是,如果我正确理解您的要求,您实际上可以通过利用 TDF 中已经存在的同步来实现您的块而无需任何手动同步。您将通过使用两个BufferBlocks、一个助手Task和来实现您的块DataflowBlock.Encapsulate()

public static IPropagatorBlock<T, T> CreateDelayedBlock<T>(TimeSpan delay)
{
    var source = new BufferBlock<T>();
    var target = new BufferBlock<T>();

    Task.Run(
        async () =>
        {
            while (await source.OutputAvailableAsync())
            {
                T item;
                if (source.TryReceive(out item))
                {
                    await Task.Delay(delay);
                    await target.SendAsync(item);
                }
                else
                {
                    // this shouldn't happen
                    // nobody else should be able to receive from source
                }
            }

            // TODO: if source failed, fail target
            target.Complete();
        });

    return DataflowBlock.Encapsulate(source, target);
}
于 2013-10-25T13:07:19.227 回答