我想我可能需要重新考虑我的设计。我很难缩小导致我的计算机完全挂起的错误的范围,有时会从 VS 2010 抛出 HRESULT 0x8007000E。
我有一个控制台应用程序(稍后将转换为服务),它处理基于数据库队列的文件传输。
我正在限制允许传输的线程。这是因为我们连接的某些系统只能包含来自某些帐户的一定数量的连接。
例如,系统 A 只能接受 3 个同时连接(这意味着 3 个单独的线程)。这些线程中的每一个都有自己独特的连接对象,因此我们不应该遇到任何同步问题,因为它们不共享连接。
我们希望循环处理来自这些系统的文件。因此,例如,我们将允许 3 个连接,每个连接最多可以传输 100 个文件。这意味着,要从系统 A 移动 1000 个文件,我们每个周期只能处理 300 个文件,因为允许 3 个线程,每个线程 100 个文件。因此,在此传输的整个生命周期中,我们将拥有 10 个线程。我们一次只能运行 3 个。因此,将有 3 个周期,最后一个周期将只使用 1 个线程来传输最后 100 个文件。(3 个线程 x 100 个文件 = 每个周期 300 个文件)
当前的架构示例是:
- System.Threading.Timer 每 5 秒检查一次队列,通过调用 GetScheduledTask()
- 如果没有什么,GetScheduledTask() 根本什么都不做
- 如果有工作,则创建一个ThreadPool线程来处理工作【工作线程A】
- 工作线程 A 看到有 1000 个文件要传输
- 工作线程 A 发现它只能有 3 个线程运行到它从中获取文件的系统
- 工作线程 A 启动三个新的工作线程 [B,C,D] 并转移
- 工作线程 A 等待 B,C,D
[WaitHandle.WaitAll(transfersArray)]
- 工作线程A看到队列中还有更多文件(现在应该是700)
- 工作线程 A 创建一个新数组以等待
[transfersArray = new TransferArray[3]
系统 A 的最大值,但可能因系统而异 - 工作线程 A 启动三个新的工作线程 [B,C,D] 并等待它们
[WaitHandle.WaitAll(transfersArray)]
- 重复该过程,直到没有更多文件要移动。
- 工作线程 A 表示它已完成
我正在使用 ManualResetEvent 来处理信号。
我的问题是:
- 是否有任何明显的情况会导致资源泄漏或我遇到的问题?
- 我是否应该在每次调用后遍历数组
WaitHandle.WaitAll(array)
并调用array[index].Dispose()?
- 此进程的任务管理器下的句柄计数缓慢上升
- 我正在从 System.Threading.Timer 调用工作线程 A 的初始创建。这会有什么问题吗?该计时器的代码是:
(一些调度类代码)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}