3

我正在写一个写事件日志的线程。当应用程序(优雅地)关闭时,我需要确保该线程在释放之前完成其保存日志的工作。如果我Free直接调用线程,它不应该立即被销毁,它应该等到线程完成并且没有更多工作要做。

这是我的线程执行布局的方式:

procedure TEventLogger.Execute;
var
  L: TList;
  E: PEventLog; //Custom record pointer
begin
  while not Terminated do begin //Repeat continuously until terminated
    try
      E:= nil;
      L:= LockList; //Acquire locked queue of logs to be written
      try
        if L.Count > 0 then begin //Check if any logs exist in queue
          E:= PEventLog(L[0]); //Get next log from queue
          L.Delete(0); //Remove log from queue
        end;
      finally
        UnlockList;
      end;
      if E <> nil then begin
        WriteEventLog(E); //Actual call to save log
      end;
    except
      //Handle exception...
    end;
    Sleep(1);
  end;
end;

这是析构函数...

destructor TEventLogger.Destroy;
begin
  ClearQueue; //I'm sure this should be removed
  FQueue.Free;
  DeleteCriticalSection(FListLock);
  inherited;
end;

现在我已经知道,在Free被调用的时候,我应该提出一个标志,使得不可能再向队列中添加任何日志——它只需要完成已经存在的内容。我的问题是我知道当线程被释放时上面的代码将被强制切断。

Free当被调用时,我应该如何让这个线程完成它的工作?或者,如果这不可能,那么通常应该如何构造这个线程以实现这一点?

4

4 回答 4

13

如果我直接调用 Free 到线程,它不应该立即被销毁,它应该等到线程完成并且没有更多工作要做。

我认为您对销毁线程时会发生什么有轻微的误解。当您调用FreeaTThread时,析构函数中会发生以下情况:

  1. Terminate叫做。
  2. WaitFor叫做。
  3. 然后线程的析构函数的其余部分运行。

换句话说,调用Free已经完成了你所要求的,即通知线程方法它需要终止,然后等待它这样做。

由于您可以控制线程的方法,因此一旦检测到标志已设置Execute,您就可以在那里做尽可能多的工作。Terminated正如 Remy 建议的那样,您可以覆盖DoTerminate并在那里完成最后的工作。


对于它的价值,这是实现队列的糟糕方法。那个呼唤我就Sleep(1)跳出来了。你需要的是一个阻塞队列。您清空队列,然后等待事件。当生产者添加到队列中时,会发出事件信号,以便您的线程可以唤醒。

于 2013-02-22T15:43:42.483 回答
6

这是我对如何编写消费者线程的看法。拼图的第一部分是阻塞队列。我的看起来像这样:

unit BlockingQueue;

interface

uses
  Windows, SyncObjs, Generics.Collections;

type
  TBlockingQueue<T> = class
  //see Duffy, Concurrent Programming on Windows, pp248
  private
    FCapacity: Integer;
    FQueue: TQueue<T>;
    FLock: TCriticalSection;
    FNotEmpty: TEvent;
    function DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
  public
    constructor Create(Capacity: Integer=-1);//default to unbounded
    destructor Destroy; override;
    function Enqueue(const Value: T): Boolean;
    procedure ForceEnqueue(const Value: T);
    function Dequeue: T;
  end;

implementation

{ TBlockingQueue<T> }

constructor TBlockingQueue<T>.Create(Capacity: Integer);
begin
  inherited Create;
  FCapacity := Capacity;
  FQueue := TQueue<T>.Create;
  FLock := TCriticalSection.Create;
  FNotEmpty := TEvent.Create(nil, True, False, '');
end;

destructor TBlockingQueue<T>.Destroy;
begin
  FNotEmpty.Free;
  FLock.Free;
  FQueue.Free;
  inherited;
end;

function TBlockingQueue<T>.DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
var
  WasEmpty: Boolean;
begin
  FLock.Acquire;
  Try
    Result := IgnoreCapacity or (FCapacity=-1) or (FQueue.Count<FCapacity);
    if Result then begin
      WasEmpty := FQueue.Count=0;
      FQueue.Enqueue(Value);
      if WasEmpty then begin
        FNotEmpty.SetEvent;
      end;
    end;
  Finally
    FLock.Release;
  End;
end;

function TBlockingQueue<T>.Enqueue(const Value: T): Boolean;
begin
  Result := DoEnqueue(Value, False);
end;

procedure TBlockingQueue<T>.ForceEnqueue(const Value: T);
begin
  DoEnqueue(Value, True);
end;

function TBlockingQueue<T>.Dequeue: T;
begin
  FLock.Acquire;
  Try
    while FQueue.Count=0 do begin
      FLock.Release;
      Try
        FNotEmpty.WaitFor;
      Finally
        FLock.Acquire;
      End;
    end;
    Result := FQueue.Dequeue;
    if FQueue.Count=0 then begin
      FNotEmpty.ResetEvent;
    end;
  Finally
    FLock.Release;
  End;
end;

end.

它是完全线程安全的。任何线程都可以入队。任何线程都可以出队。如果队列为空,则 dequeue 函数将阻塞。队列可以在有界或无界模式下操作。

接下来我们需要一个与这样的队列一起工作的线程。该线程只是将作业从队列中拉出,直到被告知终止。我的消费者线程如下所示:

unit ConsumerThread;

interface

uses
  SysUtils, Classes, BlockingQueue;

type
  TConsumerThread = class(TThread)
  private
    FQueue: TBlockingQueue<TProc>;
    FQueueFinished: Boolean;
    procedure SetQueueFinished;
  protected
    procedure TerminatedSet; override;
    procedure Execute; override;
  public
    constructor Create(Queue: TBlockingQueue<TProc>);
  end;

implementation

{ TConsumerThread }

constructor TConsumerThread.Create(Queue: TBlockingQueue<TProc>);
begin
  inherited Create(False);
  FQueue := Queue;
end;

procedure TConsumerThread.SetQueueFinished;
begin
  FQueueFinished := True;
end;

procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  //ensure that, if the queue is empty, we wake up the thread so that it can quit
  FQueue.ForceEnqueue(SetQueueFinished);
end;

procedure TConsumerThread.Execute;
var
  Proc: TProc;
begin
  while not FQueueFinished do begin
    Proc := FQueue.Dequeue();
    Proc();
    Proc := nil;//clear Proc immediately, rather than waiting for Dequeue to return since it blocks
  end;
end;

end.

这具有您正在寻找的特性。即当线程被销毁时,它将在完成析构函数之前处理所有待处理的任务。

要查看它的实际效果,这里有一个简短的演示程序:

unit Main;

interface

uses
  Windows, SysUtils, Classes, Controls, Forms, StdCtrls,
  BlockingQueue, ConsumerThread;

type
  TMainForm = class(TForm)
    Memo1: TMemo;
    TaskCount: TEdit;
    Start: TButton;
    Stop: TButton;
    procedure StartClick(Sender: TObject);
    procedure StopClick(Sender: TObject);
  private
    FQueue: TBlockingQueue<TProc>;
    FThread: TConsumerThread;
    procedure Proc;
    procedure Output(const Msg: string);
  end;

implementation

{$R *.dfm}

procedure TMainForm.Output(const Msg: string);
begin
  TThread.Synchronize(FThread,
    procedure
    begin
      Memo1.Lines.Add(Msg);
    end
  );
end;

procedure TMainForm.Proc;
begin
  Output(Format('Consumer thread ID: %d', [GetCurrentThreadId]));
  Sleep(1000);
end;

procedure TMainForm.StartClick(Sender: TObject);
var
  i: Integer;
begin
  Memo1.Clear;
  Output(Format('Main thread ID: %d', [GetCurrentThreadId]));
  FQueue := TBlockingQueue<TProc>.Create;
  FThread := TConsumerThread.Create(FQueue);
  for i := 1 to StrToInt(TaskCount.Text) do
    FQueue.Enqueue(Proc);
end;

procedure TMainForm.StopClick(Sender: TObject);
begin
  Output('Stop clicked, calling thread destructor');
  FreeAndNil(FThread);
  Output('Thread destroyed');
  FreeAndNil(FQueue);
end;

end.

object MainForm: TMainForm
  Caption = 'MainForm'
  ClientHeight = 560
  ClientWidth = 904
  object Memo1: TMemo
    Left = 0
    Top = 96
    Width = 904
    Height = 464
    Align = alBottom
  end
  object TaskCount: TEdit
    Left = 8
    Top = 8
    Width = 121
    Height = 21
    Text = '10'
  end
  object Start: TButton
    Left = 8
    Top = 48
    Width = 89
    Height = 23
    Caption = 'Start'
    OnClick = StartClick
  end
  object Stop: TButton
    Left = 120
    Top = 48
    Width = 75
    Height = 23
    Caption = 'Stop'
    OnClick = StopClick
  end
end
于 2013-02-22T21:55:05.337 回答
3

修改你的代码,我建议同时检查最后一个队列计数,注意LastCount我在这里介绍的变量:

procedure TEventLogger.Execute;
var
  L: TList;
  E: PEventLog; //Custom record pointer
  LastCount: integer;
begin
  LastCount:=0;//counter warning
  while not (Terminated and (LastCount=0)) do begin //Repeat continuously until terminated
    try
      E:= nil;
      L:= LockList; //Acquire locked queue of logs to be written
      try
        LastCount:=L.Count;
        if LastCount > 0 then begin //Check if any logs exist in queue
          E:= PEventLog(L[0]); //Get next log from queue
          L.Delete(0); //Remove log from queue
        end;
      finally
        UnlockList;
      end;
      if E <> nil then begin
        WriteEventLog(E); //Actual call to save log
      end;
    except
      //Handle exception...
    end;
    Sleep(1);
  end;
end;
于 2013-02-22T16:27:46.157 回答
3

这是一个“惰性”EventLogger 线程,它将保存队列中的所有事件。

unit EventLogger;

interface

uses
  Classes, SyncObjs, Contnrs;

type
  TEventItem = class
    TimeStamp : TDateTime;
    Info : string;
  end;

  TEventLogger = class( TThread )
  private
    FStream : TStream;
    FEvent :  TEvent;
    FQueue :  TThreadList;
  protected
    procedure TerminatedSet; override;
    procedure Execute; override;
    procedure WriteEvents;
    function GetFirstItem( out AItem : TEventItem ) : Boolean;
  public
    constructor Create; overload;
    constructor Create( CreateSuspended : Boolean ); overload;
    destructor Destroy; override;

    procedure LogEvent( const AInfo : string );
  end;

implementation

uses
  Windows, SysUtils;

{ TEventLogger }

constructor TEventLogger.Create( CreateSuspended : Boolean );
begin
  FEvent := TEvent.Create;
  FQueue := TThreadList.Create;

  inherited;
end;

constructor TEventLogger.Create;
begin
  Create( False );
end;

destructor TEventLogger.Destroy;
begin
  // first the inherited part
  inherited;
  // now freeing the internal instances
  FStream.Free;
  FQueue.Free;
  FEvent.Free;
end;

procedure TEventLogger.Execute;
var
  LFinished : Boolean;
begin
  inherited;
  LFinished := False;
  while not LFinished do
    begin

      // waiting for event with 20 seconds timeout
      // maybe terminated or full queue
      WaitForSingleObject( FEvent.Handle, 20000 );

      // thread will finished if terminated
      LFinished := Terminated;

      // write all events from queue
      WriteEvents;

      // if the thread gets terminated while writing
      // it will be still not finished ... and therefor one more loop

    end;
end;

function TEventLogger.GetFirstItem( out AItem : TEventItem ) : Boolean;
var
  LList : TList;
begin
  LList := FQueue.LockList;
  try
    if LList.Count > 0
    then
      begin
        AItem := TEventItem( LList[0] );
        LList.Delete( 0 );
        Result := True;
      end
    else
      Result := False;
  finally
    FQueue.UnlockList;
  end;
end;

procedure TEventLogger.LogEvent( const AInfo : string );
var
  LList : TList;
  LItem : TEventItem;
begin
  if Terminated
  then
    Exit;

  LItem           := TEventItem.Create;
  LItem.TimeStamp := now;
  LItem.Info      := AInfo;

  LList := FQueue.LockList;
  try

    LList.Add( LItem );

    // if the queue is "full" we will set the event

    if LList.Count > 50
    then
      FEvent.SetEvent;

  finally
    FQueue.UnlockList;
  end;

end;

procedure TEventLogger.TerminatedSet;
begin
  // this is called if the thread is terminated
  inherited;
  FEvent.SetEvent;
end;

procedure TEventLogger.WriteEvents;
var
  LItem :   TEventItem;
  LStream : TStream;
begin
  // retrieve the first event in list
  while GetFirstItem( LItem ) do
    try

      // writing the event to a file

      if not Assigned( FStream )
      then
        FStream := TFileStream.Create( ChangeFileExt( ParamStr( 0 ), '.log' ), fmCreate or fmShareDenyWrite );

      // just a simple log row
      LStream := 
        TStringStream.Create( 
          Format( 
            '[%s] %s : %s', 
             // when it is written to file
            [FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', now ),
             // when did it happend
             FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', LItem.TimeStamp ),
             // whats about 
             LItem.Info] ) + sLineBreak, 
          TEncoding.UTF8 );
      try
        LStream.Seek( 0, soFromBeginning );
        FStream.CopyFrom( LStream, LStream.Size );
      finally
        LStream.Free;
      end;

    finally
      LItem.Free;
    end;
end;

end.
于 2013-02-22T17:28:56.557 回答