多年前,在旧论坛上,我问 Primozh,Pipeline 模式是否可以像 Uroboros 一样,将半完整的结果反馈给它自己。
当时 Primozh 表示这将是直截了当的,PipeLine 阶段不仅可以将 OmniValues 提供给 OUTPUT,还可以提供给 INPUT。
问题是初始喂养阶段运行得太快,它们过期并密封输入集合并且没有办法解封它,因此一旦他们尝试将半生不熟的数据包送回自己 - 瞧!- OTL 抛出“无法添加到已完成的集合”异常。
那么上面链接的这个自爆任务,如何通过自喂Pipeline
模式来实现呢?
UPD:将示例从“自爆炸” - 生成大量中间半计算结果 - 排列生成,更改为简单(我希望)计算阶乘。然而,这具有确定性的缺点:它总是生成一个中间作业项,因此管道处理不断增长的集合的能力没有被尝到。
{$A+} // not $A8
type FactTask = record
Goal, Curr: Cardinal;
Value : Int64;
end;
procedure TForm6.Button1Click(Sender: TObject);
var Msg: string;
f: FactTask;
Results: TArray<Int64>;
pipeOut: IOmniBlockingCollection;
pipe: IOmniPipeline;
begin
lblResults.Caption := ' WAIT, we are producing...';
Repaint;
pipe := Parallel.Pipeline;
f.Goal := edLen.Value; // 10
f.Curr := 0;
f.Value := 1;
pipe.Stage(
procedure ( const input, output: IOmniBlockingCollection )
begin
output.Add( TOmniValue.FromRecord( f ) );
end
);
pipe.Stage(
procedure ( const input, output: IOmniBlockingCollection )
var f_in, f_out: FactTask; v: TOmniValue;
begin
for v in input do begin
f_in := v.ToRecord<FactTask>;
if f_in.Curr < f_in.Goal then begin
f_out.Goal := f_in.Goal;
f_out.Curr := Succ(f_in.Curr);
f_out.Value := f_in.Value * f_out.Curr;
input.Add( TOmniValue.FromRecord( f_out ) ); // <<< Exception!
end;
end;
end
);
pipe.Stage(
procedure ( const input, output: IOmniBlockingCollection )
var f_in: FactTask; v: TOmniValue;
begin
for v in input do begin
f_in := v.ToRecord<FactTask>;
if f_in.Curr = f_in.Goal then begin
Output.Add( f_in.Value );
end;
end;
end
);
pipe.Run;
pipeOut := pipe.Output;
// pipe.WaitFor(INFINITE); ToArray would efficiently do that
// pipeOut.CompleteAdding; ...without frozing on Pipeline/Collections SetThrottle
Results := TOmniBlockingCollection.ToArray<Int64>(pipeOut);
Msg := IntToStr(f.Goal) + '! = ' + IntToStr(Results[0]);
lblResults.Caption := Msg;
ShowMessage(Msg);
end;
它在管道阶段尝试重新填充意外被密封的输入时崩溃TOmniPipeline.Run
。在标记的行处,意外抛出了“无法添加到已完成的集合”异常。
当收集在空和少数之间平衡时如何保持管道运行(这不仅是开始条件,它会在计算结束附近重复)?
有点梦想:https
://plus.google.com/+AriochThe/posts/LCHnSCmZYtx
更多:https ://github.com/gabr42/OmniThreadLibrary/issues/61