1

我需要构建一个应用程序,其中一些对象实例同时生成“脉冲”。(本质上这只是意味着他们正在增加一个计数器。)我还需要跟踪每个对象的总计数器。此外,每当我对计数器执行读取操作时,都需要将其重置为零。

所以我在和一个工作的人交谈,他提到了 Retlang 和基于消息的并发,这听起来非常有趣。但显然我对这个概念很陌生。因此,我构建了一个小型原型,并获得了预期的结果,这非常棒 - 但由于我对 Retlang 和一般的并发编程。

首先,我有这些课程:

public class Plc {

        private readonly IChannel<Pulse> _channel;
        private readonly IFiber _fiber;
        private readonly int _pulseInterval;
        private readonly int _plcId;

        public Plc(IChannel<Pulse> channel, int plcId, int pulseInterval) {
            _channel = channel;
            _pulseInterval = pulseInterval;
            _fiber = new PoolFiber();
            _plcId = plcId;
        }

        public void Start() {
            _fiber.Start();
            // Not sure if it's safe to pass in a delegate which will run in an infinite loop...
            // AND use a shared channel object...
            _fiber.Enqueue(() => {
                SendPulse();
            });
        }

        private void SendPulse() {
            while (true) {
                // Not sure if it's safe to use the same channel object in different
                // IFibers...
                _channel.Publish(new Pulse() { PlcId = _plcId });
                Thread.Sleep(_pulseInterval);
            }
        }
    }

    public class Pulse {
        public int PlcId { get; set; }
    }

这里的想法是我可以实例化多个 Plc,将每个 Plc 传递给相同的 IChannel,然后让它们同时执行 SendPulse 函数,这将允许每个 Plc 发布到同一个通道。但正如你从我的评论中看到的那样,我有点怀疑我所做的是否真的合法。我主要担心在不同 IFiber 的上下文中使用相同的 IChannel 对象发布,但我也担心永远不会从传递给 Enqueue 的委托返回。我希望有人可以就我应该如何处理这个问题提供一些见解。

此外,这里是“订阅者”类:

public class PulseReceiver {

        private int[] _pulseTotals;
        private readonly IFiber _fiber;
        private readonly IChannel<Pulse> _channel;
        private object _pulseTotalsLock;

        public PulseReceiver(IChannel<Pulse> channel, int numberOfPlcs) {
            _pulseTotals = new int[numberOfPlcs];
            _channel = channel;
            _fiber = new PoolFiber();
            _pulseTotalsLock = new object();
        }

        public void Start() {
            _fiber.Start();
            _channel.Subscribe(_fiber, this.UpdatePulseTotals);
        }

        private void UpdatePulseTotals(Pulse pulse) {
            // This occurs in the execution context of the IFiber.
            // If we were just dealing with the the published Pulses from the channel, I think
            // we wouldn't need the lock, since I THINK the published messages would be taken
            // from a queue (i.e. each Plc is publishing concurrently, but Retlang enqueues
            // the messages).
            lock(_pulseTotalsLock) {

                _pulseTotals[pulse.PlcId - 1]++;
            }
        }

        public int GetTotalForPlc(int plcId) {
            // However, this access takes place in the application thread, not in the IFiber,
            // and I think there could potentially be a race condition here.  I.e. the array
            // is being updated from the IFiber, but I think I'm reading from it and resetting values
            // concurrently in a different thread.
            lock(_pulseTotalsLock) {

                if (plcId <= _pulseTotals.Length) {

                    int currentTotal = _pulseTotals[plcId - 1];
                    _pulseTotals[plcId - 1] = 0;
                    return currentTotal;
                }
            }

            return -1;
        }
    }

所以在这里,我重用了分配给 Plc 实例的相同 IChannel,但有不同的 IFiber 订阅它。理想情况下,我可以接收来自每个 Plc 的消息,并更新我班级中的单个私有字段,但以线程安全的方式。

据我了解(并且我在评论中提到),我认为我可以安全地简单地更新我提供给订阅函数的委托中的 _pulseTotals 数组,因为我会连续接收来自 Plcs 的每条消息。

但是,我不确定如何最好地处理需要读取总数并重置它们的位。正如您从代码和注释中看到的那样,我最终为对 _pulseTotals 数组的任何访问加了一个锁。但我不确定这是否有必要,我很想知道a)实际上是否有必要这样做,为什么这样做,或者b)实现类似事情的正确方法。

最后,为了更好地衡量,这是我的主要功能:

static void Main(string[] args) {

        Channel<Pulse> pulseChannel = new Channel<Pulse>();

        PulseReceiver pulseReceiver = new PulseReceiver(pulseChannel, 3);
        pulseReceiver.Start();

        List<Plc> plcs = new List<Plc>() {
            new Plc(pulseChannel, 1, 500),
            new Plc(pulseChannel, 2, 250),
            new Plc(pulseChannel, 3, 1000)
        };

        plcs.ForEach(plc => plc.Start());

        while (true) {
            Thread.Sleep(10000);
            Console.WriteLine(string.Format("Plc 1: {0}\nPlc 2: {1}\nPlc 3: {2}\n", pulseReceiver.GetTotalForPlc(1), pulseReceiver.GetTotalForPlc(2), pulseReceiver.GetTotalForPlc(3)));
        }
    }

我实例化一个单独的 IChannel,将其传递给所有内容,其中 Receiver 在内部使用 IFiber 订阅,而 Plcs 使用 IFibers 来“排队”一个不断发布到频道的非返回方法。

同样,控制台输出看起来与我期望的完全一样,即等待 10 秒后,我看到 Plc 1 的 20 个“脉冲”。并且在读取后重置计数器似乎也有效,即 Plc 1 在每 10 秒增量后有 20 个“脉冲”。但这并不能让我放心,我没有忽略一些重要的事情。

我真的很高兴能更多地了解 Retlang 和并发编程技术,所以希望有人有时间筛选我的代码并针对我的具体问题提供一些建议,或者甚至根据我的要求提供不同的设计!

4

0 回答 0