0

我想我可能需要重新考虑我的设计。我很难缩小导致我的计算机完全挂起的错误的范围,有时会从 VS 2010 抛出 HRESULT 0x8007000E。

我有一个控制台应用程序(稍后将转换为服务),它处理基于数据库队列的文件传输。

我正在限制允许传输的线程。这是因为我们连接的某些系统只能包含来自某些帐户的一定数量的连接。

例如,系统 A 只能接受 3 个同时连接(这意味着 3 个单独的线程)。这些线程中的每一个都有自己独特的连接对象,因此我们不应该遇到任何同步问题,因为它们不共享连接。

我们希望循环处理来自这些系统的文件。因此,例如,我们将允许 3 个连接,每个连接最多可以传输 100 个文件。这意味着,要从系统 A 移动 1000 个文件,我们每个周期只能处理 300 个文件,因为允许 3 个线程,每个线程 100 个文件。因此,在此传输的整个生命周期中,我们将拥有 10 个线程。我们一次只能运行 3 个。因此,将有 3 个周期,最后一个周期将只使用 1 个线程来传输最后 100 个文件。(3 个线程 x 100 个文件 = 每个周期 300 个文件)

当前的架构示例是:

  1. System.Threading.Timer 每 5 秒检查一次队列,通过调用 GetScheduledTask()
  2. 如果没有什么,GetScheduledTask() 根本什么都不做
  3. 如果有工作,则创建一个ThreadPool线程来处理工作【工作线程A】
  4. 工作线程 A 看到有 1000 个文件要传输
  5. 工作线程 A 发现它只能有 3 个线程运行到它从中获取文件的系统
  6. 工作线程 A 启动三个新的工作线程 [B,C,D] 并转移
  7. 工作线程 A 等待 B,C,D[WaitHandle.WaitAll(transfersArray)]
  8. 工作线程A看到队列中还有更多文件(现在应该是700)
  9. 工作线程 A 创建一个新数组以等待[transfersArray = new TransferArray[3]系统 A 的最大值,但可能因系统而异
  10. 工作线程 A 启动三个新的工作线程 [B,C,D] 并等待它们[WaitHandle.WaitAll(transfersArray)]
  11. 重复该过程,直到没有更多文件要移动。
  12. 工作线程 A 表示它已完成

我正在使用 ManualResetEvent 来处理信号。

我的问题是:

  1. 是否有任何明显的情况会导致资源泄漏或我遇到的问题?
  2. 我是否应该在每次调用后遍历数组WaitHandle.WaitAll(array)并调用array[index].Dispose()?
  3. 此进程的任务管理器下的句柄计数缓慢上升
  4. 我正在从 System.Threading.Timer 调用工作线程 A 的初始创建。这会有什么问题吗?该计时器的代码是:

(一些调度类代码)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}
4

4 回答 4

2

0x8007000E 是内存不足错误。那和句柄计数似乎指向资源泄漏。确保您正在处理每个实现IDisposable. 这包括ManualResetEvent您正在使用的 s 数组。

如果您有时间,您可能还想转换为使用 .NET 4.0Task类;它旨在更干净地处理这样的复杂场景。通过定义子Task对象,您可以减少总线程数(线程非常昂贵,不仅因为调度,还因为它们的堆栈空间)。

于 2010-07-16T20:34:52.557 回答
1

我正在寻找类似问题的答案(Handles Count 随着时间的推移而增加)。

我查看了您的应用程序架构,并想向您推荐一些可以帮助您的东西:

您听说过 IOCP(输入输出完成端口)吗?

我不确定使用 C# 实现这一点的难度,但在 C/C++ 中它是小菜一碟。通过使用它,您可以创建一个唯一的线程池(该池中的线程数通常定义为 PC 或服务器中处理器或处理器核心数的 2 倍)您将此池与 IOCP 句柄相关联,该池执行工作。请参阅这些函数的帮助: CreateIoCompletionPort(); PostQueuedCompletionStatus(); GetQueuedCompletionStatus();

一般来说,动态创建和退出线程可能很耗时,并导致性能损失和内存碎片。在 MSDN 和 google 中有数以千计的关于 IOCP 的文献。

于 2011-01-26T01:07:32.040 回答
0

我认为你应该重新考虑你的架构。您只能同时拥有 3 个连接的事实几乎是在乞求您使用 1 个线程来生成文件列表并使用 3 个线程来处理它们。您的生产者线程会将所有文件插入队列,当项目到达队列时,3 个消费者线程将出列并继续处理。阻塞队列可以显着简化代码。如果您使用的是 .NET 4.0,那么您可以利用BlockingCollection类。

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

在上面的例子中,我肯定把事情过于简单化了,但我希望你能得到大致的想法。请注意,这要简单得多,因为线程同步的方式并不多(大多数将嵌入阻塞队列中),当然也没有使用WaitHandle对象。显然,您必须添加正确的机制才能优雅地关闭线程,但这应该相当容易。

于 2010-07-17T00:51:56.010 回答
0

事实证明,这个奇怪问题的根源与架构无关,而是因为将解决方案从 3.5 转换为 4.0。我重新创建了解决方案,没有执行任何代码更改,并且问题再也没有发生过。

于 2010-09-13T18:19:11.793 回答