-1

我开发了一个接收 TStream 的程序;但基本类型,允许发送所有类型的流继承者。

此过程旨在为每个内核或多个线程创建一个线程。每个线程都会对流数据进行详细分析(只读),并且由于 Pascal 类是通过引用分配的,而不是按值分配的,因此会有线程冲突,因为读取位置是 intercalará。

为了解决这个问题,我希望该过程完成所有工作,将内存中的最后一个 TStream 加倍,并为其分配一个新变量。这样我可以复制足够数量的 TStream,以便每个线程都有一个唯一的 TStream。非常线程库内存结束后。

注意:程序在 DLL 中,线程工作。

注2:目标是程序做所有必要的服务,即在没有代码干预的情况下调用;您可以轻松地传递一个TStream 数组,而不仅仅是一个 TStream。但我不想要它!目的是服务完全由程序提供。

你知道怎么做吗?

谢谢你。

添加:

我有一个低级的想法,但我对 Pascal 的了解是有限的。

  1. 确定对象在内存中的地址及其大小。
  2. 在内存中创建一个与原始对象大小相同的新地址。
  3. 将整个内容(原始)对象复制到这个新地址。
  4. 我创建了一个指向 TStream 的指针,它指向内存中的这个新地址。

这会起作用,还是愚蠢?如果是,如何操作?请举例!

2º 加法:

举个例子,假设程序对加密流执行暴力攻击(只是一个例子,因为它不适用):

场景:一个 8 核 CPU 中的 30GB 文件:

1º - TMemoryStream:

创建 8 个 TMemoryStream 并为每个 TMemoryStreams 复制文件的全部内容。这将导致同时使用 240GB RAM。我考虑了这个错误的想法。此外,它会将处理时间增加到最快的程度,而不使用多线程。我将不得不将整个文件读入内存,然后加载,开始分析它。坏了!

 * TMemoryStream 的一个不好的替代方法是将文件慢慢复制到 TMemoryStream 中,以 100MB/核心(800MB)为单位,而不是占用内存。所以每个线程看起来只有 100MB,释放内存直到你完成整个文件。但问题是它需要 DLL 中的 Synchronize() 函数,我们知道这不起作用,因为我在Synchronize() 中打开问题 DLL 冻结而没有错误和崩溃

2º - TFileStream:

这在我看来更糟。看,我得到一个 TStream,创建 8 个 TFileStream 并为每个 TFileStream 复制所有 30GB。这很糟糕,因为在磁盘上占用了 240GB,这对于 HDD 来说也是一个很高的价值。HD中的读写时间(复制)将使多线程的实现比单线程更耗时。坏了!

结论: 上述两种方法都需要使用 synchronize() 将每个线程排队以读取文件。因此,即使在多核 CPU 上,线程也不会同时运行。我知道即使他可以同时访问文件(直接创建几个TFileStream),操作系统仍然enfileiraria线程一次读取文件,因为HDD不是真正的线程安全,他不能读取两个数据同时 。这是硬盘的物理限制!但是,与我手动实现 synchronize() 不同,OS 的排队管理更加有效,并有效地减少了潜在瓶颈。这证明了我克隆 TStream 的想法是合理的,它将留下所有管理文件访问队列的工作;没有任何干预——我知道他会比我做得更好。

例子

在上面的例子中,我想让 8 个线程同时不同地分析同一个 Stream,知道线程不知道提供了什么样的 Stream,它可以是一个文件 Stream,一个来自互联网的流,甚至是一个小的 TStringStream。主程序将只创建一个Strean,并带有配置参数。一个简单的例子:

TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;

TService = record
  stream: TStream;
  modes: array of TModesFB;
end;

例如,应该可以只分析流 M1,只分析 M2,或同时分析 [M1,M2]。TModesFB 组合改变了流的分析方式。数组“modes”中的每个项目,作为一个任务列表,将由不同的线程处理。任务列表示例(JSON 表示):

{
  Stream: MyTstream,
  modes: [
    [M1, m5],
    [M1],
    [M5, m2],
    [M5, m2, m4, m3],
    [M1, m1, m3]
  ]
}

注意:在分析仪 [m1] + [m2] <> [m1, m2]。

在程序中:

function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';

在 DLL 中:

// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType; 
var 
  i, processors : integer;

begin
  processors := getCPUCount();

  if (maxCores < processors) and (maxCores > 0) then
    processors := maxCores;

  setlength (globalThreads, processors);

  for i := 0 to processors - 1 do
    // It is obvious that the counter modes in the original is not the same counter processors.
    if i < length(Task.modes) then begin
      globalThreads[i] := TAnalusysThread.create(true, Task.stream, Task.modes[i])
      globalThreads[i].start();
    end;

  [...]
end;

注意:使用单线程程序运行良好,没有已知错误。

我希望每个线程都负责一种分析,并且我不能在 DLL 中使用 Synchronize()。理解?有足够干净的解决方案吗?

4

2 回答 2

1

克隆流的代码如下:

streamdest:=TMemoryStream.create; 
streamsrc.position:=0; 
streamdest.copyfrom(streamdest); 
streamsrc.position:=0; 
streamdest.position:=0;

然而,在 DLL 边界上做事很难,因为 DLL 有自己的库和库状态副本。目前不建议这样做。

于 2016-02-14T21:16:54.533 回答
0

我正在回答我的问题,因为我认为没有人有一个非常好的解决方案。也许是因为没有!

因此,我采用了Marco van de VoortKen White的想法,作为一个解决方案,该解决方案使用 TMemoryStream 并在内存批处理 50MB 中部分加载,使用TRTLCriticalSection进行同步。

该解决方案还包含附加 2 中提到的相同缺点;他们是:

  1. 排队访问硬盘是我的程序而不是操作系统的责任;
  2. 单个线程在内存中携带两倍的相同数据。
  3. 取决于处理器速度,可能是线程很好地分析了快速的 50MB 内存;另一方面,加载内存可能非常慢。那会使使用多个线程按顺序运行,失去使用多线程的优势,因为每个线程都拥塞访问文件,按顺序运行就好像它们是单个线程一样。

所以我认为这个解决方案是一个肮脏的解决方案。但现在它有效!

下面我举一个简单的例子。这意味着这种改编可能包含明显的逻辑和/或语法错误。但这足以证明。

使用相同的问题示例,而不是将电流传递给“分析”,而是传递指向进程的指针。此过程负责同步读取 50MB 流批次。

DLL 和程序:

TLotLoadStream = function (var toStm: TMemoryStream; lot, id: integer): int64 of object;

TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;

TaskTService = record
  reader: TLotLoadStream; {changes here <<<<<<< } 
  modes: array of TModesFB;
end;

在程序中:

type
{ another code here }
TForm1 = class(TForm)
  { another code here }

  CS    : TRTLCriticalSection;  
  stream: TFileStream;
  function MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;

  { another code here }
end;

function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';

{ another code here }

implementation

{ another code here }

function TForm1.MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;
const
  lotSize = (1024*1024) * 50; // 50MB

var
  ler: int64;

begin
  result := -1;
  { 
    MUST BE PERFORMED PREVIOUSLY - FOR EXAMPLE IN TForm1.create()
    InitCriticalSection (self.CriticalSection);
  }   

  toStm.Clear;
  ler    := 0;

  { ENTERING IN CRITICAL SESSION  }
  EnterCriticalSection(self.CS);

  { POSITIONING IN LOT OF BEGIN}
  self.streamSeek(lot * lotSize, soBeginning);

  if (lot = 0) and (lotSize >= self.stream.size) then
    ler := self.stream.size
  else
    if self.stream.Size >= (lotSize + (lot * lotSize)) THEN
      ler := lotSize
    else
      ler := (self.stream.Size) - self.stream.Position; // stream inicia em 0?

  { COPYNG }
  if (ler > 0) then
    toStm.CopyFrom(self.stream, ler);

  { LEAVING THE CRITICAL SECTION }
  LeaveCriticalSection(self.CS);

  result := ler; 
end;

在 DLL 中:

{ another code here }
// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType; 
var 
  i, processors : integer;

begin
  processors := getCPUCount();

  if (maxCores < processors) and (maxCores > 0) then
    processors := maxCores;

  setlength (globalThreads, processors);

  for i := 0 to processors - 1 do
    // It is obvious that the counter modes in the original is not the same counter processors.
    if i < length(Task.modes) then begin
      globalThreads[i] := TAnalusysThread.create(true, Task.reader, Task.modes[i])
      globalThreads[i].start();
    end;

  { another code here }
end;

在 DLL 线程类中:

type
{ another code here }
MyThreadAnalysis = class(TThread)
  { another code here }

  reader: TLotLoadStream;
  procedure Execute;

  { another code here }
end;

{ another code here }

implementation

{ another code here }

procedure MyThreadAnalysis.Execute;
var
  Stream: TMemoryStream;
  lot: integer;     

  {My analyzer already all written using buff, the job of rewriting it is too large, then it is so, two readings, two loads in memory, as I already mentioned in the question!}   
  buf: array[1..$F000] of byte; // 60K

begin         
  lot    := 0;  
  Stream := TMemoryStream.Create;     
  self.reader(stream, lot);

  while  (assigned(Stream)) and (Stream <> nil) and (Stream.Size > 0) then begin
    Stream.Seek(0, soBeginning);      

    { 2º loading to memory buf }        
    while (Stream.Position < Stream.Size) do begin
      n := Stream.read(buf, sizeof(buf));     

      { MY CODE HERE }
    end;

    inc(lot);
    self.reader(stream, lot, integer(Pchar(name)));     
  end;
end;

因此,正如所见,这是一个权宜之计的解决方案。我仍然希望找到一个干净的解决方案,让我可以将流量控制器加倍,这样访问数据是操作系统的责任,而不是我的程序。

于 2016-02-21T14:33:17.693 回答