3

我有一个队列,我可以在其中将不同的线程排入队列,所以我可以保证两件事:

  1. 请求被一一处理。
  2. 请求按到达顺序处理

第二点很重要。否则一个简单的临界区就足够了。我有不同的请求组,只有在一个组内必须满足这些点。来自不同组的请求可以并发运行。

它看起来像这样:

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

编辑:我已经删除了实际的实现,因为它隐藏了我想要解决的问题

我需要这个,因为我有一个基于 Indy 的 Web 服务器,它接受 http 请求。首先,我为请求找到一个对应的会话。然后为该会话执行请求(代码)。我可以为同一个会话获得多个请求(读取我可以在第一个仍在处理时获得新请求)并且它们必须以正确的到达顺序一个接一个地执行。所以我寻求一个可以在这种情况下使用的通用同步队列,以便可以对请求进行排队。我无法控制线程,每个请求都可以在不同的线程中执行。

解决此类问题的最佳(通常)方法是什么?问题是 Enqueue 和 Dequeue 必须是原子操作,以便保持正确的顺序。我当前的实现有一个很大的瓶颈,但它确实有效。

编辑:波纹管是原子入队/出队操作的问题

你通常会做这样的事情:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

现在这里的问题是这不是原子的。如果您有一个线程已经在队列中,而另一个线程来并调用 Enqueue,则可能会发生第二个线程将离开临界区并尝试阻塞自己。现在线程调度程序将恢复第一个线程,它将尝试解除对下一个(第二个)线程的阻塞。但是第二个线程还没有被阻塞,所以什么也没有发生。现在第二个线程继续并阻塞自己,但这是不正确的,因为它不会被解除阻塞。如果阻塞在临界区内,则临界区永远不会离开,我们就会陷入死锁。

4

3 回答 3

9

另一种方法:

让每个请求线程都有一个最初未设置的手动重置事件。队列管理器是一个简单的对象,它维护此类事件的线程安全列表。Enqueue()和方法都将Dequeue()请求线程的事件作为参数。

type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

每个请求线程都调用Enqueue()队列管理器,然后等待自己的事件发出信号。然后它处理请求并调用Dequeue()

{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

队列管理器锁定 inEnqueue()和 in的事件列表Dequeue()。如果列表为空,Enqueue()则在参数中设置事件,否则重置事件。然后它将事件附加到列表中。因此第一个线程可以继续请求,所有其他线程将阻塞。在Dequeue()事件从列表顶部删除,并设置下一个事件(如果有的话)。

这样,最后一个请求线程将导致下一个请求线程解除阻塞,完全不会暂停或恢复线程。此解决方案也不需要任何额外的线程或窗口,每个请求线程只需要一个事件对象。

于 2009-12-07T13:20:50.587 回答
2

我会考虑到您评论中的其他信息来回答。

如果您有许多线程需要序列化,那么您可以使用 Windows 免费提供的序列化机制。让每个队列成为具有自己的窗口和标准消息循环的线程。使用SendMessage()代替PostThreadMessage(),Windows 将负责阻塞发送线程,直到消息被处理,并确保保持正确的执行顺序。通过为每个请求组使用具有自己窗口的线程,您可以确保多个组仍然同时处理。

这是一个简单的解决方案,仅当请求本身可以在与其起源不同的线程上下文中处理时才有效,这在许多情况下应该不是问题。

于 2009-12-07T12:41:38.000 回答
0

你试过 Delphi 提供的 TThreadList 对象吗?

它是线程安全的,它为您管理锁。您在主线程内管理线程“外部”的列表。

当请求请求新任务时,您将其添加到列表中。当一个线程完成时,您可以使用 OnTerminate 事件调用列表中的下一个线程。

于 2009-12-08T15:49:21.773 回答