0

我有一个应用程序从 API 异步接收事件,并且可以在这个 API 上同步调用方法。

出于线程安全的目的,我需要锁定应用程序中的每个同步函数和每个事件处理程序。

但是,同步调用 API 方法可能会导致 API 在不同的线程上引发事件,并在返回之前等待它们被处理。

因此,这可能会导致死锁,因为 API 将等待要处理的事件继续进行,但在我的类中,同步对象将被两个不同的线程命中,程序将挂起。

我目前的想法是,尝试锁定而不是锁定事件处理程序,如果不可能(例如,如果事件是由另一个线程上的同步调用产生的)在队列/消息泵中缓冲事件。

在释放同步函数调用的锁之前,我会调用一个ProcessPendingEvents()函数,以便可以在没有死锁的情况下处理事件。

对于这种情况,你有什么设计模式可以推荐吗?我对任何事情都持开放态度。

这是一个简单的示例来说明我当前的暂定实现。我的真正目标是拥有一个尽可能以单线程方式运行的类:

class APIAdapter {
    readonly object AdapterLock = new object();
    private readonly ConcurrentQueue<Tuple<object, EventArgs>> PendingEvents = new ConcurrentQueue<Tuple<object, EventArgs>>();
    ExternalAPI API = new ExternalAPI();

    APIAdapter() {
        ExternalAPI.Data += ExternalAPI_Data;
    }

    public void RequestData() {
        lock (this.AdapterLock) {
            this.ExternalAPI.SynchronousDataRequest(); //Will cause the API to raise the Data event would therefore deadlock if I had a simple lock() in ExternalAPI_Data.
            this.ProcessPendingEvents();
        }
    }

    private void ExternalAPI_Data(object sender, EventArgs e) {
        if (!Monitor.TryEnter(this.AdapterLock)) {
            this.PendingEvents.Enqueue(Tuple.Create(sender, e));
            return;
        }
        Console.Write("Received event.");
        Monitor.Exit(this.AdapterLock);
    }

    private void ProcessPendingEvents() {
        Tuple<object, EventArgs> ev;
        while (this.PendingEvents.TryDequeue(out ev)) {
            ExternalAPI_Data(ev.Item1, ev.Item2);
        }
    }
}
4

1 回答 1

0

我最初的解决方案并不令人满意:after ProcessPendingEvents()已完成,但在释放锁之前,其他事件可能会被缓冲,直到下一次调用ProcessPendingEvents().

如果 API 在不同的线程上发回两个事件,事件也可以随时被缓冲,而且我非常缺乏在锁被释放后立即使用这些事件的方法。

我最终实现了更清洁的生产者/消费者模式来控制何时处理 API 事件,使用BlockingCollection. 下面是对应的代码,有兴趣的可以参考一下:

class APIAdapter {
    readonly object AdapterLock = new object();
    private readonly BlockingCollection<Tuple<object, EventArgs>> PendingEvents = new BlockingCollection<Tuple<object, EventArgs>>();
    ExternalAPI API = new ExternalAPI();

    APIAdapter() {
        ExternalAPI.Data += ExternalAPI_Data;
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }

    public void Consume() {
        foreach (var e in this.PendingEvents.GetConsumingEnumerable()) {
            if (this.PendingEvents.IsAddingCompleted) return;
            ProcessData(e.Item1, e.Item2);
        }
    }

    public void RequestData() {
        lock (this.AdapterLock) {
            this.ExternalAPI.SynchronousDataRequest(); 
        }
    }

    private void ExternalAPI_Data(object sender, EventArgs e) {
        this.PendingEvents.Add(Tuple.Create(sender, e));
    }

    private void ProcessData(object sender, EventArgs e) {
        lock (this.AdapterLock) {
            Console.Write("Received event.");
        }
    }
}
于 2013-11-27T08:21:32.150 回答