1

例如,如何将记录的工作负载拆分到多个线程,特别是具有 169 条记录和 7 个线程的 Accuracer DB。

因为我可以只拆分范围内的记录数并让每个线程处理该范围。但是,如果用户删除或添加记录,它将无法正常工作。

4

1 回答 1

3

您可以使用OmniThreadLibrary并行处理数据库中的记录,而无需太多麻烦。

我用Pipeline抽象写了一个例子。3个阶段的管道常量:

  1. 第一阶段从数据库中读取数据,创建容器对象的一个​​实例来代表管道下一阶段的数据。
  2. 第二阶段处理传入的数据。

    • 调用DoSomethingWith仅浪费大约 100 毫秒的过程。模拟数据处理
    • 释放容器实例的内存。
    • 然后将文字值添加1到输出队列中,以通知最后阶段已处理了另一条记录。

    此阶段配置为在 7 个线程中并行运行。

  3. 最后一个阶段只计算前一阶段完成了多少条记录

该示例是一个控制台应用程序,您只需复制/粘贴即可查看它在您的机器中的实时运行。

program Project1;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  OtlCommon,
  OtlCollections,
  OtlParallel,
  System.Diagnostics,
  DB, DBClient;

type
  //auxiliar container, used to copy the database data
  //to avoid synchronization. remember TDataSet "current record"
  //may cause conflicts if changed from different threads.
  TContainer = class
  private
    FName: string;
    FID: Int64;
  public
    property ID: Int64 read FID write FID;
    property Name: string read FName write FName;
  end;

//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
  Sleep(100);
end;

//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
  I: Integer;
begin
  Result := TClientDataSet.Create(nil);
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'ID';
    DataType := ftLargeint;
  end;
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'NAME';
    DataType := ftString;
  end;
  Result.CreateDataSet;
  for I := 1 to Random(1000) do
    Result.InsertRecord([I, 'Test']);
end;

var
  RecordsProcessed: Integer;
  SW: TStopwatch;
  Data: TDataSet;
begin
  IsMultiThread := True;
  Randomize;
  Writeln('wait while processing...');
  SW := TStopwatch.Create;
  SW.Start;
  try
    Data := CreateDataSet;
    try
      RecordsProcessed := Parallel.Pipeline
        .Stage(
          procedure (const Input, Output: IOmniBlockingCollection)
          var
            RecData: TContainer;
          begin
            Data.First;
            while not Data.Eof do
            begin
              RecData := TContainer.Create;
              RecData.ID := Data.Fields[0].AsLargeInt;
              RecData.Name := Data.Fields[1].AsString;
              Output.Add(RecData);
              Data.Next;
            end;
          end)
        .Stage(
          procedure (const Input: TOmniValue; var Output: TOmniValue)
          begin
            //process the real thing here
            DoSomethingWith(Input);
            Input.AsObject.Free;
            Output := 1; //another record
          end)
        .NumTasks(7) //this stage is processed by 7 parallel tasks
        .Stage(
           procedure (const Input, Output: IOmniBlockingCollection)
           var
             Recs: Integer;
             Value: TOmniValue;
           begin
             Recs := 0;
             for Value in Input do
               Inc(Recs, Value);
             Output.Add(Recs);
           end)
        .Run.Output.Next;
      SW.Stop;
      Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
      Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
    finally
      Data.Free;
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  readln;
end.

恕我直言,这样做的主要优点是:

  • 你有一个灵活的机制在多个工人之间分配工作。如果某些记录需要更多时间来处理,图书馆会处理这种情况,您可以合理地期望在更短的时间内完成全部工作。
  • 一旦您完成从数据库中读取第一条记录,您的第一个处理线程就会启动。
  • 如果您必须等待基表中的更多传入记录,您可以轻松调整它。在阶段过程中的代码结束之前,阶段的输出队列不会被标记为完成。如果在某个时候没有更多工作要做,那么所有即将到来的阶段都会阻止等待更多数据处理。
  • 您只需更改参数值即可更改工作线程的数量!
于 2013-03-10T07:53:36.657 回答