-3

多年前,在旧论坛上,我问 Primozh,Pipeline 模式是否可以像 Uroboros 一样,将半完整的结果反馈给它自己。

当时 Primozh 表示这将是直截了当的,PipeLine 阶段不仅可以将 OmniValues 提供给 OUTPUT,还可以提供给 INPUT。

问题是初始喂养阶段运行得太快,它们过期并密封输入集合并且没有办法解封它,因此一旦他们尝试将半生不熟的数据包送回自己 - 瞧!- OTL 抛出“无法添加到已完成的集合”异常。

那么上面链接的这个自爆任务,如何通过自喂Pipeline模式来实现呢?

UPD:将示例从“自爆炸” - 生成大量中间半计算结果 - 排列生成,更改为简单(我希望)计算阶乘。然而,这具有确定性的缺点:它总是生成一个中间作业项,因此管道处理不断增长的集合的能力没有被尝到。

{$A+} // not $A8
type FactTask = record
  Goal, Curr: Cardinal;
  Value : Int64;
end;

procedure TForm6.Button1Click(Sender: TObject);
var Msg: string;
    f: FactTask;
    Results: TArray<Int64>;
    pipeOut: IOmniBlockingCollection;
    pipe:    IOmniPipeline;
begin
  lblResults.Caption := ' WAIT, we are producing...';
  Repaint;

  pipe := Parallel.Pipeline;
  f.Goal := edLen.Value; // 10
  f.Curr := 0;
  f.Value := 1;

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     begin
       output.Add( TOmniValue.FromRecord( f ) );
     end
  );

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     var f_in, f_out: FactTask; v: TOmniValue;
     begin
       for v in input do begin
         f_in := v.ToRecord<FactTask>;
         if f_in.Curr < f_in.Goal then begin
            f_out.Goal := f_in.Goal;
            f_out.Curr := Succ(f_in.Curr);
            f_out.Value := f_in.Value * f_out.Curr;
            input.Add( TOmniValue.FromRecord( f_out ) ); //  <<< Exception!
         end;
       end;
     end
  );

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     var f_in: FactTask;  v: TOmniValue;
     begin
       for v in input do begin
         f_in := v.ToRecord<FactTask>;
         if f_in.Curr = f_in.Goal then begin
            Output.Add( f_in.Value );
         end;
       end;
     end
  );

  pipe.Run;
  pipeOut := pipe.Output;
//    pipe.WaitFor(INFINITE);  ToArray would efficiently do that
//    pipeOut.CompleteAdding;    ...without frozing on Pipeline/Collections SetThrottle
  Results := TOmniBlockingCollection.ToArray<Int64>(pipeOut);

  Msg := IntToStr(f.Goal) + '! = ' + IntToStr(Results[0]);
  lblResults.Caption := Msg;
  ShowMessage(Msg);
end;

它在管道阶段尝试重新填充意外被密封的输入时崩溃TOmniPipeline.Run。在标记的行处,意外抛出了“无法添加到已完成的集合”异常。

当收集在空和少数之间平衡时如何保持管道运行(这不仅是开始条件,它会在计算结束附近重复)?

有点梦想:https
://plus.google.com/+AriochThe/posts/LCHnSCmZYtx 更多:https ://github.com/gabr42/OmniThreadLibrary/issues/61

4

1 回答 1

2

您的演示程序可以正常工作。

您的第一阶段只是将一条记录输出到其输出(顺便说一句,您可以通过写入到主线程中来做到这一点pipe.Input),然后它关闭其输出管道。

这反过来又导致第二阶段关闭。在for v in input退出之前,第二阶段通常会尝试写入input(除非您对时机非常幸运)。但是,它input已经关闭并Add引发了异常。调用TryAdd而不是Add将解决这个问题。

我猜这Pipeline并不是您真正要寻找的抽象,并且使用其他东西会更好。我会使用包装 a 的普通低级任务TOmniBlockingCollection(对于第 2 阶段)。您必须创建此阻塞集合,Create(1)以便它知道它正在喂食并自动解除阻塞。(有关更多信息,请参阅本书中的树示例中的并行搜索。)

于 2016-01-25T08:48:22.780 回答