0

这个问题真的很难调试,并不总是发生(不会在短时间内发生,所以我可以轻松地调试代码)并且看起来没有人遇到过这样的类似问题?(我已经用谷歌搜索了几个小时,但没有找到与此问题相关的任何内容)。

简而言之,我的数据流网络在某些时候工作正常,直到我发现终端块(更新 UI)似乎停止工作(UI 上没有更新新数据),而所有向上的数据流块仍然工作正常. 所以这里的其他块和 ui 块之间好像有一些断开连接。

这是我详细的数据流网络,让我们先检查一下,然后再解释有关该问题的更多信息:

//the network graph first
[raw data block] 
-> [switching block] -> [data counting block]
                     -> [processing block] -> [ok result block] -> [completion monitoring]
                                           -> [not ok result block] -> [completion monitoring]

//in the UI code behind where I can consume the network and plug-in some other blocks for updating
//like this:
     [ok result block] -> [ok result counting block]
     [not ok result block] -> [other ui updating]

该块[ok result block]BroadcastBlock将结果推送到[ok result counting block]. 我在这里部分描述的问题是这[ok result counting block]似乎与[ok result block].

var options = new DataflowBlockOptions { EnsureOrdered = false };
var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 80 };

//[raw data block]
var rawDataBlock = new BufferBlock<Input>(options);

//[switching block]
var switchingBlock = new TransformManyBlock<Input,Input>(e => new[] {e,null});

//[data counting block]
var dataCountingBlock = new BroadcastBlock<Input>(null);

//[processing block]
var processingBlock = new TransformBlock<Input,int>(async e => {
    //call another api to compute the result
    var result = await …;
    //rollback the input for later processing (some kind of retry)
    if(result < 0){
       //per my logging, there is only one call dropping 
       //in this case
       Task.Run(rollback);
    }
    //local function to rollback
    async Task rollback(){
      await rawDataBlock.SendAsync(e).ConfigureAwait(false);
    }
    return result;
}, execOptions);

//[ok result block]
var okResultBlock = new BroadcastBlock<int>(null, options);

//[not ok result block]
var notOkResultBlock = new BroadcastBlock<int>(null, options);

//[completion monitoring]
var completionMonitoringBlock = new ActionBlock<int>(e => {
     if(rawDataBlock.Completion.IsCompleted && processingBlock.InputCount == 0){
          processingBlock.Complete();
     }
}, execOptions);

//connect the blocks to build the network
rawDataBlock.LinkTo(switchingBlock);
switchingBlock.LinkTo(processingBlock, e => e != null);
switchingBlock.LinkTo(dataCountingBlock, e => e == null);

processingBlock.LinkTo(okResultBlock, e => e >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e < 9);

okResultBlock.LinkTo(completionMonitoringBlock);
notOkResultBlock.LinkTo(completionMonitoringBlock);

在后面的 UI 代码中,我插入了一些其他 UI 块来更新信息。我在这里使用WPF,但我认为在这里没关系:

var uiBlockOptions = new ExecutionDataflowBlockOptions {
     TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
};
dataCountingBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      RawInputCount++;
}, uiBlockOptions));

okResultBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      ProcessedCount++;
      OkResultCount++;
}, uiBlockOptions));

notOkResultBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      ProcessedCount++;
      PendingCount = processingBlock.InputCount;
}, uiBlockOptions));

我确实有代码监控块的完成状态:rawDataBlock, processingBlock, okResultBlock, notOkResultBlock. 我还有其他日志记录代码processingBlock来帮助诊断。

所以正如我所说,经过相当长的一段时间(大约 1 小时处理了大约 60 万个项目,实际上这个数字没有说明这个问题,它可能是随机的),网络似乎仍然运行良好,除了一些计数(好的结果, not ok result) 不会更新,就像okResultBlocknotOkResultBlock已与processingBlockOR 断开连接,或者它们已与 UI 块断开连接(这会更新 UI)。我确保它processingBlock仍在工作(没有记录异常并且结果仍然写入文件),dataCountingBlock仍然运行良好(在 UI 上更新了新计数),所有块processingBlock, okResultBlock,notOkResultBlock都没有完成(它们的完成是.ContinueWith一项任务它会注销状态并且没有记录)。

所以它真的卡在那里。我不知道为什么它会停止那样工作。这只有在我们使用像TPL Dataflow. 我知道您可能也很难诊断、想象和思考可能性。我只是在这里询问解决此问题的建议以及您的任何共享经验(关于类似问题),并可能对可能导致此类问题的一些猜测TPL Dataflow

更新

在我准备一些代码写下一些信息以帮助调试之前,我已经成功地再次重现了该错误。现在问题一直持续到这一点:processingBlock不知何故实际上并没有向所有链接的块(包括okResultBlockand )推送/发布/发送任何消息,甚至链接到它的新块(以假为notOkResultBlock前缀)也无法接收任何消息(结果)。正如我所说,它似乎仍然可以正常工作(它确实在内部运行代码并正常生成结果日志记录)。所以这仍然是一个非常奇怪的问题。DataflowLinkOptionsAppendprocessBlockAction

简而言之,现在的问题变成了为什么processBlock无法将其消息发送/发布到所有其他链接块?是否有任何可能的原因导致这种情况发生?如何知道块是否成功链接(在调用之后.LinkTo)?

4

1 回答 1

1

这实际上是我的错,processingBlock它实际上被阻止了,但它被正确地阻止并且以一种好的方式(按设计)。

processingBlock两个因素阻止:

  • 是(默认EnsureOrdered情况true下),因此输出始终按处理顺序排队。
  • 至少有一个输出结果不能被推出(到其他块)。

所以如果一个输出结果不能被推送出去,它就会成为阻塞项,因为所有输出结果都按处理顺序排队。所有处理后的输出结果将被第一个无法推出的输出结果简单地阻塞(排队)。

就我而言,此处无法推出的特殊输出结果是null结果。该空结果只能由某些错误(异常处理)产生。所以我有 2 个块okResultBlocknotOkResultBlock链接到processingBlock. 但是这两个块都被过滤,只让非空结果通过。抱歉,我的问题没有反映我所拥有的关于输出类型的确切代码。在问题中它只是一个简单int但实​​际上它是一个类(可为空),实际的链接代码如下所示:

processingBlock.LinkTo(okResultBlock, e => e != null && e.Point >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e != null && e.Point < 9);

因此null输出结果将被阻塞,并因此阻塞所有处理后的结果(因为默认情况下该选项EnsureOrderedtrue

为了解决这个问题,我只是简单地设置EnsureOrderedfalse(虽然这不是避免阻塞所必需的,但在我的情况下它很好)并添加一个块来消耗null输出结果(这是帮助避免阻塞最重要的):

processingBlock.LinkTo(DataflowBlock.NullTarget<Output>(), e => e == null);
于 2020-08-10T11:48:16.837 回答