21

目前我正在开发一个 Delphi XE3 客户端/服务器应用程序来传输文件(使用 Indy FTP 组件)。客户端部分监视一个文件夹,获取其中的文件列表,将它们上传到服务器并删除原始文件。上传是由一个单独的线程完成的,该线程一个一个地处理文件。文件的范围从 0 到几千不等,它们的大小也有很大差异。

这是一个为 OSX 和 Windows 编译的 Firemonkey 应用程序,所以我不得不使用 TThread 而不是我更喜欢的 OmniThreadLibrary。我的客户报告说应用程序随机冻结。我无法复制它,但由于我对 TThread 没有太多经验,我可能在某处放置了死锁条件。我读了很多例子,但我仍然不确定一些多线程细节。

应用程序结构很简单:
主线程中的计时器检查文件夹并将有关每个文件的信息放入记录中,该记录进入通用 TList。此列表保存有关文件名称、大小、进度、文件是完全上传还是必须重试的信息。所有这些都显示在带有进度条等的网格中。这个列表只能由主线程访问。之后,通过调用 AddFile 方法(下面的代码)将列表中的项目发送到线程。该线程将所有文件存储在一个线程安全队列中,例如http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
当文件上传时,上传线程通知主线程调用同步。
主线程定期调用 Uploader.GetProgress 方法来检查当前文件进度并显示出来。这个函数实际上不是线程安全的,但它会导致死锁,还是只返回错误的数据?

什么是进行进度检查的安全有效的方法?

那么这种方法可以吗还是我错过了什么?你会怎么做?
例如,我想创建一个新线程只是为了读取文件夹内容。这意味着我使用的 TList 必须是线程安全的,但必须始终访问它以刷新 GUI 网格中显示的信息。不是所有的同步都会减慢 GUI 的速度吗?

我已经发布了下面的简化代码,以防有人想看它。如果没有,我很乐意听到一些关于我应该使用什么的意见。主要目标是在 OSX 和 Windows 上工作;能够显示有关所有文件的信息和当前文件的进度;无论文件的数量和大小如何,都能做出响应。

这是上传线程的代码。为了便于阅读,我删除了一些内容:

type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

以及与上传者交互的部分主线程:

......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;
4

3 回答 3

1

这可能是一个长镜头,但这是另一种可能性[前一个答案可能更有可能](我刚刚遇到过,但以前知道的):使用 Synchronize 可能导致死锁。这是一个关于为什么会发生这种情况的博客: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

文章的相关点:

线程 A 调用 Synchronize(MethodA)

线程 B 调用 Synchronize(MethodB)

然后,在主线程的上下文中:

处理消息时主线程调用 CheckSynchronize()

CheckSynchronize 被实现为批处理所有等待的调用(*)。所以它会拾取等待调用的队列(包含 MethodA 和 MethodB)并一一循环遍历它们。

MethodA 在主线程的上下文中执行。假设 MethodA 调用 ThreadB.WaitFor

WaitFor 调用 CheckSynchronize 来处理任何对 Synchronize 的等待调用

理论上,这应该然后处理 ThreadB 的 Synchronize(MethodB),让 Thread B 完成。但是,MethodB 已经拥有第一个 CheckSynchronize 调用,因此它永远不会被调用。

僵局!

Embarcadero QC 文章更详细地描述了该问题。

虽然我在上面的代码中没有看到任何 ProcessMessages 调用,或者就此而言,在同步期间将调用 WaitFor,但在调用同步时,另一个线程将同步调用为好吧 - 但是主线程已经同步并且正在阻塞。

一开始我并没有这样做,因为我倾向于避免像瘟疫一样的同步调用,并且通常使用其他方法从线程设计 UI 更新,例如消息传递和带有消息通知的线程安全列表,而不是同步调用。

于 2013-04-02T21:56:51.180 回答
1

这可能不是问题,但 TFileInfo 是一个记录。

这意味着当作为(非 const/var)参数传递时,它会被复制。这可能会导致记录中的字符串等问题,在复制记录时不会更新引用计数。

要尝试的一件事是将其设为类并将实例作为参数传递(即指向堆上数据的指针)。

其他需要注意的是线程 32 位系统上的共享 Int64(例如您的大小值)。

更新/读取这些不是原子完成的,您没有任何特定的保护措施,因此读取值可能会由于线程而导致高低 32 位不匹配。(例如读取高 32 位,写入高 32 位,写入低 32 位,读取低 32 位,在不同线程中进行读写)。这可能不会导致您看到的问题,除非您正在处理 > 4GB 的文件传输,否则不太可能给您带来任何问题。

于 2013-03-02T04:06:51.587 回答
1

死锁肯定很难发现,但这可能是问题所在。在您的代码中,我没有看到您在 enqueue、peek 或 dequeue 中添加了任何超时 - 这意味着它将采用 Infinite 的默认值。

enqueue 中有这一行 - 这意味着,就像任何同步对象一样,它将阻塞直到 Enter 完成(它锁定监视器)或超时发生(因为你没有超时,它将永远等待)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
...    
if not TMonitor.Enter(FQueue, Timeout)

我还将假设您基于 Dequeue 自己实现了 PEEK——只是您实际上并没有删除该项目。

这似乎实现了自己的超时 - 但是,您仍然拥有以下内容:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
...
if not TMonitor.Enter(FQueue, Timeout)

超时是无限的 - 所以,如果你在 peek 方法中等待它发出无限超时信号,那么你不能在不阻塞等待 peek 方法完成的情况下从第二个线程排队无限超时。

这是来自 TMonitor 的评论片段

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout.

由于实现默认使用 Infinite,并且没有提供 TMonitor.Spinlock 值,这将阻塞线程,直到它可以获取 FQueue 对象。

我的建议是更改您的代码如下:

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while true do
    case fQueue.Peek(fCurrentFile,10) 
      wrSignaled:
        try
          if UploadFile(fCurrentFile) = '' then
          begin
            fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
            SignalComplete;
          end;
        except
          on E: Exception do
            SignalError(E.Message);
        end;
      wrTimeout: sleep(10);
      wrIOCompletion,
      wrAbandoned,
      wrError: break;
    end; //case

这样,peek 就不会无限期地持有 FQueue 的锁,而是为 Enqueue 留下一个窗口来获取它并从主 (UI) 线程添加文件。

于 2013-03-22T22:03:03.463 回答