9

考虑两个类;ProducerConsumer(与经典模式相同,每个都有自己的线程)。是否有可能Producer有一个Event可以Consumer注册的对象,当生产者触发事件时,消费者的事件处理程序在自己的线程中运行?以下是我的假设:

  • Consumer不知道Producer's 事件是在他自己的线程中触发还是在另一个线程中触发。

  • 既不Producer是也不Consumer是后代,Control因此他们没有 BeginInvoke继承方法。

PS。我不是想实现Producer-Consumer模式。这是两个简单的类,我正在尝试重构生产者,使其包含线程。

[更新]

为了进一步扩展我的问题,我试图以最简单的方式包装要使用的硬件驱动程序。例如,我的包装器将有一个StateChanged事件,主应用程序将注册到该事件,以便在硬件断开连接时通知它。由于实际的驱动程序除了轮询来检查它的存在之外别无他法,我需要启动一个线程来定期检查它。一旦它不再可用,我将触发需要在添加时在同一线程中执行的事件。我知道这是一个经典的生产者-消费者模式,但由于我试图简化使用我的驱动程序包装器,我不希望用户代码实现消费者。

[更新]

由于一些评论表明这个问题没有解决方案,我想添加几行可能会改变他们的想法。考虑到BeginInvoke可以做我想做的事,所以这应该不是不可能的(至少在理论上)。实现我自己的BeginInvoke并在其中调用它Producer是一种看待它的方式。就是不知道怎么弄BeginInvoke

4

5 回答 5

5

你想做线程间通信。是的,有可能。使用 System.Windows.Threading.Dispatcher http://msdn.microsoft.com/en-us/library/system.windows.threading.dispatcher.aspx

Dispatcher 为特定线程维护工作项的优先队列。当在线程上创建 Dispatcher 时,它成为唯一可以与线程关联的 Dispatcher,即使 Dispatcher 已关闭。如果您尝试获取当前线程的 CurrentDispatcher 并且 Dispatcher 未与该线程关联,则会创建一个 Dispatcher。当您创建 DispatcherObject 时,也会创建一个 Dispatcher。如果在后台线程上创建 Dispatcher,请确保在退出线程之前关闭 Dispatcher。

于 2013-08-02T08:34:39.323 回答
4

是的,有办法做到这一点。它依赖于使用SynchronizationContext类(docs)。Send同步上下文通过方法(调用线程同步)和Post(调用线程异步)将消息从一个线程发送到另一个线程的操作抽象化。

让我们采取一个稍微简单的情况,您只想要捕获一个同步上下文,即“创建者”线程的上下文。你会做这样的事情:

using System.Threading;

class HardwareEvents
{
    private SynchronizationContext context;
    private Timer timer;

    public HardwareEvents() 
    {
       context = SynchronizationContext.Current ?? new SynchronizationContext();
       timer = new Timer(TimerMethod, null, 0, 1000); // start immediately, 1 sec interval.
    }

     private void TimerMethod(object state)
     {
         bool hardwareStateChanged = GetHardwareState();
         if (hardwareStateChanged)
             context.Post(s => StateChanged(this, EventArgs.Empty), null); 
     }

     public event EventHandler StateChanged;

     private bool GetHardwareState()
     {
        // do something to get the state here.
        return true;
     }
}

现在,调用事件时将使用创建线程的同步上下文。如果创建线程是 UI 线程,它将具有由框架提供的同步上下文。如果没有同步上下文,则使用默认实现,它在线程池上调用。SynchronizationContext是一个类,如果您想提供一种自定义方式将消息从生产者发送到消费者线程,则可以将其子类化。只需覆盖PostSend发送所述消息。

如果您希望每个事件订阅者都在他们自己的线程上被回调,您必须在add方法中捕获同步上下文。然后,您保留成对的同步上下文和委托。然后在引发事件时,您将Post依次循环遍历同步上下文/委托对和每对。

还有其他几种方法可以改善这一点。例如,如果事件没有订阅者,您可能希望暂停轮询硬件。或者,如果硬件没有响应,您可能希望降低轮询频率。

于 2013-08-02T09:50:22.757 回答
3

首先,请注意,在 .NET/基类库中,事件订阅者通常有义务确保其回调代码在正确的线程上执行。这对事件生产者来说很容易:它可能只是触发它的事件,而不必关心它的各个订阅者的任何线程关联性。

这是一个可能实现的完整示例分步说明。

让我们从简单的事情开始:Producer类及其事件Event。我的示例不包括触发此事件的方式和时间:

class Producer
{
    public event EventHandler Event;  // raised e.g. with `Event(this, EventArgs.Empty);`
}

接下来,我们希望能够为我们的Consumer实例订阅此事件并在特定线程上被回调(我将这种线程称为“工作线程”):

class Consumer
{
    public void SubscribeToEventOf(Producer producer, WorkerThread targetWorkerThread) {…}
}

我们如何实现这一点?

首先,我们需要将代码“发送”到特定工作线程的方法。由于无法强制线程随时执行特定方法,因此您必须安排工作线程显式等待工作项。一种方法是通过工作项队列。这是一个可能的实现WorkerThread

sealed class WorkerThread
{
    public WorkerThread()
    {
        this.workItems = new Queue<Action>();
        this.workItemAvailable = new AutoResetEvent(initialState: false);
        new Thread(ProcessWorkItems) { IsBackground = true }.Start();
    }

    readonly Queue<Action> workItems;
    readonly AutoResetEvent workItemAvailable;

    public void QueueWorkItem(Action workItem)
    {
        lock (workItems)  // this is not extensively tested btw.
        {
            workItems.Enqueue(workItem);
        }
        workItemAvailable.Set();
    }

    void ProcessWorkItems()
    {
        for (;;)
        {
            workItemAvailable.WaitOne();
            Action workItem;
            lock (workItems)  // dito, not extensively tested.
            {
                workItem = workItems.Dequeue();
                if (workItems.Count > 0) workItemAvailable.Set();
            }
            workItem.Invoke();
        }
    }
}

这个类基本上启动了一个线程,并将它放入一个无限循环中,WaitOne直到一个项目到达它的队列(workItems)。一旦发生这种情况,项目 - an Action- 就会出列并调用。然后线程再次进入睡眠状态 ( WaitOne)) 直到队列中有另一个项目可用。

ActionQueueWorkItems 通过该方法放入队列中。WorkerThread所以本质上,我们现在可以通过调用该方法将要执行的代码发送到特定实例。我们现在准备实施Customer.SubscribeToEventOf

class Consumer
{
    public void SubscribeToEventOf(Producer producer, WorkerThread targetWorkerThread)
    {
        producer.Event += delegate(object sender, EventArgs e)
        {
            targetWorkerThread.QueueWorkItem(() => OnEvent(sender, e));
        };
    }

    protected virtual void OnEvent(object sender, EventArgs e)
    {
        // this code is executed on the worker thread(s) passed to `Subscribe…`. 
    }
}

瞧!


PS(未详细讨论):作为附加组件,您可以将发送代码的方法打包为WorkerThread使用称为 a 的标准 .NET 机制SynchronizationContext

sealed class WorkerThreadSynchronizationContext : SynchronizationContext
{
    public WorkerThreadSynchronizationContext(WorkerThread workerThread)
    {
        this.workerThread = workerThread;
    }

    private readonly WorkerThread workerThread;

    public override void Post(SendOrPostCallback d, object state)
    {
        workerThread.QueueWorkItem(() => d(state));
    }

    // other overrides for `Send` etc. omitted
}

在 的开头WorkerThread.ProcessWorkItems,您将为该特定线程设置同步上下文,如下所示:

SynchronizationContext.SetSynchronizationContext(
    new WorkerThreadSynchronizationContext(this)); 
于 2013-08-03T02:10:57.130 回答
1

我之前发布过我去过那里,并且没有很好的解决方案。

Windows.Forms.Timer但是,我只是偶然发现了我之前在另一个上下文中所做的事情:您可以在创建包装器对象时实例化一个计时器(即)。此计时器会将所有Tick事件发布到 ui 线程。

现在,如果您的设备轮询逻辑是非阻塞且快速的,您可以直接在计时器Tick事件中实现它,并在那里引发您的自定义事件。

否则,您可以继续在线程内执行轮询逻辑,而不是在线程内触发事件,您只需翻转一些布尔变量,计时器每 10 毫秒读取一次,然后触发事件。

请注意,此解决方案仍然需要从 GUI 线程创建对象,但至少对象的用户不必担心Invoke.

于 2013-08-02T10:59:05.047 回答
1

有可能的。一种典型的方法是使用BlockingCollection类。这个数据结构像一个普通队列一样工作,只是如果队列为空,出队操作会阻塞调用线程。生产者将通过调用将项目排队Add,消费者将通过调用将它们出列Take。消费者通常运行它自己的专用线程,旋转一个无限循环,等待项目出现在队列中。这或多或少是 UI 线程上的消息循环如何操作,并且是获取InvokeandBeginInvoke操作以完成编组行为的基础。

public class Consumer
{
  private BlockingCollection<Action> queue = new BlockingCollection<Action>();

  public Consumer()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action method = queue.Take();
          method();
        }
      });
    thread.Start();
  }

  public void BeginInvoke(Action method)
  {
    queue.Add(item);
  }
}
于 2013-08-02T14:34:29.967 回答