我们从未为此找到答案,但我们创建了自己的解决方法,似乎可以解决问题。为了完整起见,我将在此处发布。我希望它可以帮助面临类似情况的其他人。
要求:
- 我们有一个长时间运行的任务,它将在硬件服务器上运行。当我说长期运行时,我的意思是从一天到很多天。
- 我们希望有一个可以在网络中的任何其他桌面上启动的用户界面,以便以图形方式查看长期运行任务的统计信息。
- 用户界面可以多次启动和停止,并且可以同时出现多个实例。
- 用户界面不应对长时间运行的任务产生过多的负担。运行多个 UI 不应减慢它的速度。
该设计:
- 长时间运行的任务包含在 DLL 中。有一个带有 run() 方法的主类可以启动长时间运行的任务。
- 我们创建了一个将在硬件服务器上自动运行的 Windows 服务。
- Windows 服务将创建主类的实例并通过调用 run() 方法启动任务。
- Windows 服务还将创建一个 ServiceHost 实例并启动一个 WCF 服务实例。
- Windows 服务会将主类的引用传递给 WCF 服务。
- WCF 服务将为主类可以引发的六个事件创建处理程序。
- 从主类到 WCF 服务的所有通信都是通过引发这六个事件的一种方式。
- UI 将是 WCF 服务的客户端,并且连接将使用 NetTcp 绑定。
- WCF 服务有一个 subscribe() 方法和一个 unsubscribe() 方法,以便潜在的 UI 可以加入和离开。
- 当 UI 调用 subscribe() 方法时,它会将唯一标识符作为字符串传递。WCF 服务将标识符及其 OperationContext 放入 ConcurrentDictionary。
- 当 UI 调用 unsubscribe() 方法时,该条目将从 ConcurrentDictionary 中删除。
- UI 和 WCF 服务之间的契约具有从 WCF 服务到客户端的单向消息,用于长时间运行的任务可能引发的每种类型的事件。
- 在长时间运行的任务期间引发事件时,WCF 服务会处理该事件并遍历已注册的 UI 并向 UI 发送单向消息。
所有这一切都在这一点上起作用。
问题:
当我们对该系统进行压力测试时,我们创建了一个场景,其中长时间运行的任务尽可能快地用事件轰炸 WCF 服务。这将是最坏的情况,但我们必须能够处理它。WCF 服务能够处理事件并将消息放在 Tcp 通道上。由于消息是单向的,因此 WCF 服务不会阻止等待发送完成,这使其能够跟上正在引发的事件。
当用户界面没有将消息从通道中拉出的速度不如服务器将它们推入通道时,就会出现问题。消息会备份并最终开始超时并导致通道进入故障状态。我们希望在故障状态发生之前检测到这种情况,以便我们可以开始丢弃消息。不幸的是,我们找不到任何机制来检测此频道上的积压。如果我们将消息更改为双向,WCF 服务将阻塞,直到消息完成并且通道不会被备份,但是,这会影响长时间运行的服务并使其变慢。不好。
解决方案:
我们通过在包含长时间运行任务的同一个 DLL 中创建一个特殊类来解决这个问题。此类负责与任何附加的用户界面进行通信。此通信对象包含要引发的每个事件的 ConcurrentQueue。当长时间运行的任务通常将事件引发回 WCF 服务时,它现在将改为调用此通信对象中的方法。
在此方法中,通信对象会将事件 args 输入到该事件的 ConcurrentQueue 中。通信对象还有一个方法,该方法在创建对象时在单独的线程上启动。这种新方法将不断循环通过 concurrentQueues 并弹出事件参数并实际引发事件。我们将 NetTcp 调用改为双向调用,因此线程中的例程将绑定到 TCP 通道的速度,但由于它在单独的线程中,它不会减慢长时间运行任务的主处理。
现在我们有了一个可以使用的 ConcurrentQueue,我们可以检查积压工作。我们在逻辑上为并发队列设置了一些限制(在当前情况下为 10)。当长时间运行的任务调用该方法将事件 args 添加到队列中时,它首先检查队列的计数,如果它小于我们的逻辑限制,它将事件 args 入队,否则它只是将其丢弃并继续。这样长运行队列的速度不会受到影响,WCF 服务也不会备份并导致通道状态出现故障。
总之:
我们欢迎任何反馈或替代想法。这对我们来说似乎工作得很好,而且似乎是有弹性的。
class UI
{
#region Class Scoped Variables
private Int32 _threashold = 10;
private bool _continue = true;
#endregion Class Scoped Variables
#region Public Delegate Definitions
public delegate void OnPlanSelectionChangedDelegate(PlanSelectionChangedEventArgs e);
// other lines deleted for brevity
#endregion Public Delegate Definitions
#region Local Delegate Instances
private OnPlanSelectionChangedDelegate _onPlanSelectionChangedDelegate = null;
// other lines deleted for brevity
#endregion Local Delegate Instances
#region Local Queues for Delegates
private ConcurrentQueue<PlanSelectionChangedEventArgs> _planSelectionChangedQueue
= new ConcurrentQueue<PlanSelectionChangedEventArgs>();
// other lines deleted for brevity
#endregion Local Queues for Delegates
#region Constructor
public UI(OnPlanSelectionChangedDelegate onPlanSelectionChanged)
{
_onPlanSelectionChangedDelegate = onPlanSelectionChanged;
// other lines deleted for brevity
ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), null);
}
#endregion Constructor
#region Public Methods
public void Shutdown()
{
_continue = false;
}
public void SendPlanSelection(PlanSelectionChangedEventArgs e)
{
if (_planSelectionChangedQueue.Count < _threashold)
{
if (_cntPlanSelectionDropped > 0)
{
e.Dropped = _cntPlanSelectionDropped;
}
_planSelectionChangedQueue.Enqueue(e);
_cntPlanSelectionDropped = 0;
}
else
{
_cntPlanSelectionDropped++;
}
}
// other lines deleted for brevity
#endregion Public Methods
#region Private Asychronous Method
private void DoWork(object dummy)
{
PlanSelectionChangedEventArgs planSelectionChangedEventArgs = null;
while (_continue) // process this loop until told to quit
{
// Plan Selection Changed
// Try to get the next event args in a thread safe way
if (_planSelectionChangedQueue.TryDequeue(out planSelectionChangedEventArgs))
{
// We got an event args from the queue, do we have a valid delegate?
if (_onPlanSelectionChangedDelegate != null)
{
// We have a delegate, call it with the event args and rais the event
_onPlanSelectionChangedDelegate(planSelectionChangedEventArgs);
}
}
// other lines deleted for brevity
}
}
#endregion Private Asychronous Method
}