7

我有一个 TPL 数据流块链,想观察系统内部某处的进展。

我知道我可以将 aTransformBlock插入我想要观察的网格中,让它发布到某种进度更新器,然后将消息原封不动地返回到下一个块。我不喜欢这个解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。

所以我想知道我是否可以ISourceBlock<T>.AsObservable用来观察网格内的消息传递,而无需更改它并且不消耗消息。如果可行的话,这似乎是一个更纯粹、更实用的解决方案。

从我对 Rx 的(有限)理解来看,这意味着我需要 observable 是热的而不是冷的,这样我的progress更新程序才能看到消息但不消耗它。并且.Publish().RefCount()似乎是使可观察到的热的方法。但是,它根本无法按预期工作 - 而是接收并使用block2每条progress消息。

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

结果是不确定的,但我得到了这样的混合:

block2:21
progress:22
progress:24
block2:23
progress:25

那么,我做错了什么,还是由于 TPL 数据流AsObservable的实现方式而这不可能?

我意识到我也可以用 Observable/Observer 对替换LinkTobetween block1and ,这可能会起作用,但下游是我首先使用 TPL Dataflow 的全部原因。block2LinkToBoundedCapacity = 1

编辑: 一些澄清:

  • 我确实打算BoundedCapacity=1在block2中设置。虽然在这个简单的示例中没有必要,但下游受限的情况是我发现 TPL 数据流真正有用的地方。
  • 为了澄清我在第二段中拒绝的解决方案,可以在 block1 和 block2 之间添加以下链接:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • 我还想保持背压,这样如果更上游的区块正在向其他同等工作人员分配工作,如果该链已经很忙block1,它就不会向其发送工作。block1

4

4 回答 4

4

您的代码的问题是您连接了block1. 然后,数据流只是给消费者首先出现的价值。

因此,您需要将值广播block1到另外两个块中,然后才能独立使用它们。

只是一个旁注,不要做.Publish().RefCount(),因为它不会按照你的想法做。它将有效地使一次运行仅可观察到,在该一次运行期间将允许多个观察者连接并查看相同的值。它与数据源无关,也与 Dataflow 块的交互方式无关。

试试这个代码:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

这给了我:

块2:21
块2:22
块2:23
块2:24
块2:25
进度:21
进度:22
进度:23
进度:24
进度:25

这就是我认为你想要的。

现在,顺便说一句,为此使用 Rx 可能是一个更好的选择。它比任何 TPL 或 Dataflow 选项都更强大和更具声明性。

您的代码归结为:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

这几乎会给你同样的结果。

于 2017-06-16T05:23:51.437 回答
1

创建可观察数据流块时有两个选项需要考虑。您可以:

  1. 每次处理消息时发出通知,或
  2. 当存储在块的输出缓冲区中的先前处理的消息被链接块接受时发出通知。

两种选择都有优点和缺点。第一个选项提供及时但无序的通知。第二个选项提供有序但延迟的通知,并且还必须处理块到块链接的可处置性。当两个块之间的链接在块完成之前手动设置时,observable 会发生什么?

下面是第一个选项的实现,它创建了TransformBlock一个不消耗IObservable这个块的块。还有一个ActionBlock等效的实现,基于第一个实现(尽管它也可以通过复制粘贴和调整实现来独立TransformBlock实现,因为代码不多)。

public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

Windows 窗体中的使用示例:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

在上面的例子中,observable变量的类型是:

IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

这两个指数是从 1 开始的。

于 2020-06-21T14:20:51.390 回答
0

尝试更换:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));

和:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));

我想该ForEachAsync方法没有正确连接/它正在触发,但是异步部分正在发生一些时髦的事情。

于 2017-06-16T03:37:40.600 回答
0

通过BoundedCapacity为链中的块指定 ,您创建了一些消息被目标块拒绝的情况,因为缓冲区ActionBlock已满,并且消息被拒绝。

通过从缓冲区块创建 observable,您确实提供了一个竞争条件:您的数据有两个消费者同时获取消息。块在TPL Dataflow将数据传播给第一个可用的消费者,这会导致您进入应用程序的不确定状态。

现在,回到你的问题。您可以引入 aBroadcastBlock因为它向所有消费者提供数据副本,而不仅仅是第一个消费者,但在这种情况下,您必须消除缓冲区大小限制,广播块就像电视频道,您无法获得上一个节目,您只有一个当前的。

旁注:你不检查Post方法的返回值,你可以考虑await SendAsync使用情况,为了更好的节流效果,设置BoundedCapacity为起点块,而不是中间块。

于 2017-06-16T04:27:19.063 回答