43

尝试在单个生产者多消费者方案中使用 TThreadedQueue (Generics.Collections)。(德尔福-XE)。这个想法是将对象推入队列并让多个工作线程排空队列。

但是,它没有按预期工作。当两个或更多工作线程正在调用 PopItem 时,会从 TThreadedQueue 引发访问冲突。

如果对 PopItem 的调用使用临界区进行序列化,则一切正常。

当然 TThreadedQueue 应该能够处理多个消费者,所以我是否遗漏了什么或者这是 TThreadedQueue 中的纯错误?

这是一个产生错误的简单示例。

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

更新:TMonitor 中导致 TThreadedQueue 崩溃的错误已在 Delphi XE2 中修复。

更新 2:上述测试强调队列处于空状态。Darian Miller 发现在满状态下强调队列,仍然可以重现 XE2 中的错误。错误再次出现在 TMonitor 中。有关更多信息,请参阅下面的答案。还有一个指向 QC101114 的链接。

更新 3:随着 Delphi-XE2 更新 4,有一个已宣布的修复程序TMonitor可以解决TThreadedQueue. 到目前为止,我的测试无法再重现任何错误TThreadedQueue。当队列为空且已满时测试单个生产者/多个消费者线程。还测试了多个生产者/多个消费者。我将读取线程和写入线程从 1 更改为 100,没有任何故障。但知道历史,我敢于别人打破TMonitor

4

5 回答 5

19

好吧,如果没有大量测试就很难确定,但它看起来确实是一个错误,无论是在 TThreadedQueue 还是在 TMonitor 中。无论哪种方式,它都在 RTL 中,而不是您的代码中。您应该将此作为 QC 报告归档,并使用上面的示例作为“如何重现”代码。

于 2011-01-31T23:45:11.947 回答
10

我建议您在处理线程、并行性等时使用 OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary。Primoz做得非常好,在该站点上您会找到很多有用的文档.

于 2011-02-01T08:22:48.310 回答
4

您的示例在 XE2 下似乎运行良好,但如果我们填充您的队列,它会在 PushItem 上出现 AV 失败。(在 XE2 Update1 下测试)

要重现,只需将您的任务创建从 100 增加到 1100(您的队列深度设置为 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

这对我来说每次在 Windows 7 上都死掉了。我最初尝试持续推动对它进行压力测试,但它在循环 30 处失败......然后在循环 16......然后在 65 处,所以在不同的时间间隔,但它始终在某些时候失败观点。

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;
于 2011-11-17T22:08:36.523 回答
3

我寻找 TThreadedQueue 类,但在我的 D2009 中似乎没有。我不会因此而自杀-Delphi 线程支持一直是错误的.. errm...'非最佳',我怀疑 TThreadedQueue 也不例外:)

为什么对 PC(生产者/消费者)对象使用泛型?一个简单的 TObjectQueue 后代就可以了 - 几十年来一直在使用它 - 可以与多个生产者/消费者一起正常工作:

unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.
于 2011-05-17T20:31:02.813 回答
1

我不认为 TThreadedQueue 应该支持多个消费者。根据帮助文件,这是一个 FIFO。我的印象是有一个线程正在推动,而另一个(只有一个!)正在弹出。

于 2011-02-01T13:27:45.970 回答