例如,如何将记录的工作负载拆分到多个线程,特别是具有 169 条记录和 7 个线程的 Accuracer DB。
因为我可以只拆分范围内的记录数并让每个线程处理该范围。但是,如果用户删除或添加记录,它将无法正常工作。
例如,如何将记录的工作负载拆分到多个线程,特别是具有 169 条记录和 7 个线程的 Accuracer DB。
因为我可以只拆分范围内的记录数并让每个线程处理该范围。但是,如果用户删除或添加记录,它将无法正常工作。
您可以使用OmniThreadLibrary并行处理数据库中的记录,而无需太多麻烦。
我用Pipeline抽象写了一个例子。3个阶段的管道常量:
第二阶段处理传入的数据。
DoSomethingWith
仅浪费大约 100 毫秒的过程。模拟数据处理1
到输出队列中,以通知最后阶段已处理了另一条记录。此阶段配置为在 7 个线程中并行运行。
该示例是一个控制台应用程序,您只需复制/粘贴即可查看它在您的机器中的实时运行。
program Project1;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
OtlCommon,
OtlCollections,
OtlParallel,
System.Diagnostics,
DB, DBClient;
type
//auxiliar container, used to copy the database data
//to avoid synchronization. remember TDataSet "current record"
//may cause conflicts if changed from different threads.
TContainer = class
private
FName: string;
FID: Int64;
public
property ID: Int64 read FID write FID;
property Name: string read FName write FName;
end;
//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
Sleep(100);
end;
//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
I: Integer;
begin
Result := TClientDataSet.Create(nil);
with Result.FieldDefs.AddFieldDef do
begin
Name := 'ID';
DataType := ftLargeint;
end;
with Result.FieldDefs.AddFieldDef do
begin
Name := 'NAME';
DataType := ftString;
end;
Result.CreateDataSet;
for I := 1 to Random(1000) do
Result.InsertRecord([I, 'Test']);
end;
var
RecordsProcessed: Integer;
SW: TStopwatch;
Data: TDataSet;
begin
IsMultiThread := True;
Randomize;
Writeln('wait while processing...');
SW := TStopwatch.Create;
SW.Start;
try
Data := CreateDataSet;
try
RecordsProcessed := Parallel.Pipeline
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
RecData: TContainer;
begin
Data.First;
while not Data.Eof do
begin
RecData := TContainer.Create;
RecData.ID := Data.Fields[0].AsLargeInt;
RecData.Name := Data.Fields[1].AsString;
Output.Add(RecData);
Data.Next;
end;
end)
.Stage(
procedure (const Input: TOmniValue; var Output: TOmniValue)
begin
//process the real thing here
DoSomethingWith(Input);
Input.AsObject.Free;
Output := 1; //another record
end)
.NumTasks(7) //this stage is processed by 7 parallel tasks
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
Recs: Integer;
Value: TOmniValue;
begin
Recs := 0;
for Value in Input do
Inc(Recs, Value);
Output.Add(Recs);
end)
.Run.Output.Next;
SW.Stop;
Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
finally
Data.Free;
end;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
readln;
end.
恕我直言,这样做的主要优点是: