首先,请注意,在 .NET/基类库中,事件订阅者通常有义务确保其回调代码在正确的线程上执行。这对事件生产者来说很容易:它可能只是触发它的事件,而不必关心它的各个订阅者的任何线程关联性。
这是一个可能实现的完整示例分步说明。
让我们从简单的事情开始:Producer
类及其事件Event
。我的示例不包括触发此事件的方式和时间:
class Producer
{
public event EventHandler Event; // raised e.g. with `Event(this, EventArgs.Empty);`
}
接下来,我们希望能够为我们的Consumer
实例订阅此事件并在特定线程上被回调(我将这种线程称为“工作线程”):
class Consumer
{
public void SubscribeToEventOf(Producer producer, WorkerThread targetWorkerThread) {…}
}
我们如何实现这一点?
首先,我们需要将代码“发送”到特定工作线程的方法。由于无法强制线程随时执行特定方法,因此您必须安排工作线程显式等待工作项。一种方法是通过工作项队列。这是一个可能的实现WorkerThread
:
sealed class WorkerThread
{
public WorkerThread()
{
this.workItems = new Queue<Action>();
this.workItemAvailable = new AutoResetEvent(initialState: false);
new Thread(ProcessWorkItems) { IsBackground = true }.Start();
}
readonly Queue<Action> workItems;
readonly AutoResetEvent workItemAvailable;
public void QueueWorkItem(Action workItem)
{
lock (workItems) // this is not extensively tested btw.
{
workItems.Enqueue(workItem);
}
workItemAvailable.Set();
}
void ProcessWorkItems()
{
for (;;)
{
workItemAvailable.WaitOne();
Action workItem;
lock (workItems) // dito, not extensively tested.
{
workItem = workItems.Dequeue();
if (workItems.Count > 0) workItemAvailable.Set();
}
workItem.Invoke();
}
}
}
这个类基本上启动了一个线程,并将它放入一个无限循环中,WaitOne
直到一个项目到达它的队列(workItems
)。一旦发生这种情况,项目 - an Action
- 就会出列并调用。然后线程再次进入睡眠状态 ( WaitOne
)) 直到队列中有另一个项目可用。
Action
QueueWorkItem
s 通过该方法放入队列中。WorkerThread
所以本质上,我们现在可以通过调用该方法将要执行的代码发送到特定实例。我们现在准备实施Customer.SubscribeToEventOf
:
class Consumer
{
public void SubscribeToEventOf(Producer producer, WorkerThread targetWorkerThread)
{
producer.Event += delegate(object sender, EventArgs e)
{
targetWorkerThread.QueueWorkItem(() => OnEvent(sender, e));
};
}
protected virtual void OnEvent(object sender, EventArgs e)
{
// this code is executed on the worker thread(s) passed to `Subscribe…`.
}
}
瞧!
PS(未详细讨论):作为附加组件,您可以将发送代码的方法打包为WorkerThread
使用称为 a 的标准 .NET 机制SynchronizationContext
:
sealed class WorkerThreadSynchronizationContext : SynchronizationContext
{
public WorkerThreadSynchronizationContext(WorkerThread workerThread)
{
this.workerThread = workerThread;
}
private readonly WorkerThread workerThread;
public override void Post(SendOrPostCallback d, object state)
{
workerThread.QueueWorkItem(() => d(state));
}
// other overrides for `Send` etc. omitted
}
在 的开头WorkerThread.ProcessWorkItems
,您将为该特定线程设置同步上下文,如下所示:
SynchronizationContext.SetSynchronizationContext(
new WorkerThreadSynchronizationContext(this));