0

注意:

原帖标题

为什么来自 DWScript 的多线程 JSON 解析器不随线程数扩展?

已更改,因为此问题与使用 DWScript 处理 JSON 数据无关。问题出在 Delphi XE2 到 XE7 的默认内存管理器中(测试为 XE2 和试用 XE7),但问题首先出现在此类应用程序中。


我有在 Delphi XE2 中处理 JSON 数据的多线程 Win32/Win64 vcl 应用程序。

每个线程使用 DWScript 解析 JSON 数据TdwsJSONValue.ParseString(sJSON),使用 DWScript 方法读取值并将结果存储为记录。

出于测试目的,我在每个线程中处理相同的 JSON 数据。

单次线程运行需要N几秒钟的时间来处理数据。将线程数增加到M线性(大约M * N)会增加处理相同数据所需的单个线程内的时间。

结果没有速度提高。此应用程序的其他部分(JSON 数据传输、在目标环境中存储结果) - 按预期扩展。

可能是什么原因?任何想法表示赞赏。

补充资料:

  1. 在 Win7/32 和 Win7/64、Win8/64 从 2 核到 12 核(w/w-out HT)系统上测试

  2. DWScript 被选为最快的可用(测试了一堆,其中:Superobject,内置 Delphi)。SO 的行为类似于来自 DWS 的 JSON 单元。

  3. 下面是说明问题的完整控制台应用程序。要运行它,我们需要此处提供的示例 json 数据:https ://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0此文件包含json1.dat第一个线程的数据。对于多达 16 个线程,只需将 json1.dat 复制到 json2.dat...json16.dat。

    程序和数据应该在同一个文件夹中。运行:convert.exe N,其中 N 是线程数。

    程序以毫秒为单位将执行时间写入 stout - 在线程中花费的时间、解析数据的时间和释放(销毁)TdwsJSONValue 对象的时间。声明_dwsjvData.Destroy;不缩放。


program Convert;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  System.Diagnostics,
  System.Classes,
  dwsJSON in 'dwsJSON.pas',
  dwsStrings in 'dwsStrings.pas',
  dwsUtils in 'dwsUtils.pas',
  dwsXPlatform in 'dwsXPlatform.pas';

type

  TWorkerThread = class (TThread)
  private
    _iUid:  Integer;
    _swWatch:  TStopwatch;
    _lRunning:  Boolean;

    _sFileJSonData:  String;
    _fJsonData:  TextFile;

  protected
    constructor Create (AUid: Integer);
    procedure Execute; override;

  published
    property Running: Boolean read _lRunning;

  end;

  TConverter = class (TObject)
  private
    _swWatch0, _swWatch1, _swWatch2:  TStopwatch;

    _dwsjvData:  TdwsJSONValue;

  protected
    constructor Create;
    destructor Destroy; override;

    function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
  end;

const
  MAX_THREADS = 16;

var
  iHowMany:  Integer;
  athWorker:  array [1..MAX_THREADS] of Pointer;
  aiElapsed:  array [1..MAX_THREADS] of Integer;
  aiElapsedParse:  array [1..MAX_THREADS] of Integer;
  aiElapsedDestroy:  array [1..MAX_THREADS] of Integer;
  aiFares:  array [1..MAX_THREADS] of Integer;
  swWatchT, swWatchP:  TStopwatch;


constructor TWorkerThread.Create (AUid: Integer);
begin
  inherited Create (True);

  _iUid := AUid;
  _swWatch := TStopwatch.Create;
  _sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';

  _lRunning := False;

  Suspended := False;
end;

procedure TWorkerThread.Execute;
var
  j:  Integer;
  sLine:  String;
  slLines:  TStringList;

  oS:  TConverter;
begin
  _lRunning := True;

  oS := TConverter.Create;

  slLines := TStringList.Create;
  System.AssignFile (_fJsonData, _sFileJSonData);
  System.Reset (_fJsonData);
  j := 0;
  repeat
    System.Readln (_fJsonData, sLine);
    slLines.Add (sLine);
    Inc (j);
  until (j = 50);
//  until (System.Eof (_fJsonData));
  System.Close (_fJsonData);

  Sleep (1000);

  _swWatch.Reset;
  _swWatch.Start;

  aiFares [_iUid] := 0;
  aiElapsedParse [_iUid] := 0;
  aiElapsedDestroy [_iUid] := 0;
  for j := 1 to slLines.Count do
    aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);

  _swWatch.Stop;

  slLines.Free;
  os.Destroy;

  aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;

  _lRunning := False;
end;

constructor TConverter.Create;
begin
  inherited Create;

  _swWatch0 := TStopwatch.Create;
  _swWatch1 := TStopwatch.Create;
  _swWatch2 := TStopwatch.Create;
end;

destructor TConverter.Destroy;
begin
  inherited;
end;

function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
  jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal:  Integer;
begin
  _swWatch0.Reset;
  _swWatch0.Start;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData := TdwsJSONValue.ParseString (AJSonData);
  _swWatch1.Stop;
  iElapsedParse := _swWatch1.ElapsedMilliseconds;

  if (_dwsjvData.ValueType = jvtArray) then
  begin
    _swWatch2.Reset;
    _swWatch2.Start;

    jTotalFares := _dwsjvData.ElementCount;
    for jFare := 0 to (jTotalFares - 1) do
      if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
      begin

        _swWatch1.Reset;
        _swWatch1.Start;

        _swWatch1.Stop;
      end;
  end;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData.Destroy;
  _swWatch1.Stop;
  iElapsedDestroy := _swWatch1.ElapsedMilliseconds;

  _swWatch0.Stop;
  iElapsedTotal := _swWatch0.ElapsedMilliseconds;

  Inc (AParse, iElapsedParse);
  Inc (ADestroy, iElapsedDestroy);

  result := jTotalFares;
end;

procedure MultithreadStart;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    if (athWorker [j] = nil) then
    begin
      athWorker [j] := TWorkerThread.Create (j);

      TWorkerThread (athWorker [j]).FreeOnTerminate := False;
      TWorkerThread (athWorker [j]).Priority := tpNormal;
    end;
end;

procedure MultithreadStop;
var
  j:  Integer;
begin
  for j := 1 to MAX_THREADS do
    if (athWorker [j] <> nil) then
    begin
      TWorkerThread (athWorker [j]).Terminate;
      TWorkerThread (athWorker [j]).WaitFor;

      TWorkerThread (athWorker [j]).Free;
      athWorker [j] := nil;
    end;
end;

procedure Prologue;
var
  j:  Integer;
begin
  iHowMany := StrToInt (ParamStr (1));

  for j := 1 to MAX_THREADS do
    athWorker [j] := nil;

  swWatchT := TStopwatch.Create;
  swWatchT.Reset;

  swWatchP := TStopwatch.Create;
  swWatchP.Reset;
end;

procedure RunConvert;

  function __IsRunning: Boolean;
  var
    j:  Integer;
  begin
    result := False;
    for j := 1 to MAX_THREADS do
      result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
  end;

begin

  swWatchT.Start;

  MultithreadStart;

  Sleep (1000);
  while (__isRunning) do
    Sleep (500);

  MultithreadStop;

  swWatchT.Stop;
  Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;

procedure Epilogue;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    Writeln ( #13#10, 'Thread # ', j, '  tot.time:', aiElapsed [j], '  fares:', aiFares [j], '  tot.parse:', aiElapsedParse [j], '  tot.destroy:', aiElapsedDestroy [j]);

  Readln;
end;

begin
  try
    Prologue;
    RunConvert;
    Epilogue;

  except
    on E: Exception do
      Writeln (E.ClassName, ': ', E.Message);
  end;
end.
4

3 回答 3

1

你试过我的可扩展内存管理器吗?因为 Delphi(内部使用 fastmm)不能很好地扩展字符串和其他与内存相关的东西: https ://scalemm.googlecode.com/files/ScaleMM_v2_4_1.zip

您还可以尝试我的探查器的两种探查器模式,看看哪一部分是瓶颈: https ://code.google.com/p/asmprofiler/

于 2014-11-17T12:36:39.837 回答
1

解决方案是将默认的 Delphi XE2 或 XE7 内存管理器与英特尔® Threading Building Blocks 内存管理器交换。在示例应用程序中,它可以缩放 ca。当 app 为 64 位时,线程数最多为 16 的线性。

update: with assumption that number of threads running is less than number of cores

这是在从 2cores/4ht 到 12cores/24ht 运行 KVM 虚拟化 Windows 7 和 124GB RAM 的机器上测试的

有趣的是对 Win 7 进行虚拟化。内存分配和释放速度比原生 Win 7 快 2 倍。

结论:如果您在多线程(超过 4-8 个线程)应用程序的线程中执行大量 10kB-10MB 块的内存分配/释放操作 - 请仅使用英特尔的内存管理器。

@André:感谢您为我指明正确方向的提示!

这是用于测试的带有 TBB 内存管理器的单元,它必须在主项目文件 .dpr 的单元列表中显示为第一个

unit TBBMem;

interface

function  ScalableGetMem  (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
function  ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';

implementation

Function TBBGetMem (ASize: Integer): Pointer;
begin
  result := ScalableGetMem (ASize);
end;

Function TBBFreeMem (APtr: Pointer): Integer;
begin
  ScalableFreeMem (APtr);
  result := 0;
end;

Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
begin
  result := ScalableRealloc (APtr, ASize);
end;

const
  TBBMemoryManager:  TMemoryManager = ( GetMem: TBBGetmem;
                                        FreeMem: TBBFreeMem;
                                        ReAllocMem:  TBBReAllocMem; );
var
  oldMemoryManager:  TMemoryManager;

initialization
  GetMemoryManager (oldMemoryManager);
  SetMemoryManager (TBBMemoryManager);

finalization
  SetMemoryManager (oldMemoryManager);

end.
于 2014-11-18T14:59:44.353 回答
1

我对 FastCode MM Challenge 进行了(重新)测试,结果对于 TBB 来说并不是那么好(在块缩小测试中也出现内存不足异常)。

简而言之:ScaleMM2 和 Google TCmalloc 在这个复杂的测试中是最快的,Fastmm 和 ScaleMM2 使用的内存最少。

Average Speed Performance: (Scaled so that the winner = 100%)
  XE6         :   70,4
  TCmalloc    :   89,1
  ScaleMem2   :  100,0
  TBBMem      :   77,8

Average Memory Performance: (Scaled so that the winner = 100%)
  XE6         :  100,0
  TCmalloc    :   29,6
  ScaleMem2   :   75,6
  TBBMem      :   38,4

FastCode 挑战:https
://code.google.com/p/scalemm/source/browse/#svn%2Ftrunk%2FChallenge TBB 4.3:https ://www.threadingbuildingblocks.org/download

于 2014-11-22T20:44:21.670 回答