2

我正在从历史数据库接收大约 50 Hz 的数据。我想以 10 Hz 左右的频率将其从另一端流出。为了实现这一点,我生成了两个计时器,一个用于从历史数据库中获取数据(我运行它的速度是发送计时器的两倍)。第二个定时器为 200 毫秒(10 赫兹)。第一个计时器将它获得的每个值都存储在 BlockingCollection 中(我也尝试了 ConcurrentQueue)。由于两个不同的线程读取/写入集合,我无法使用普通的列表/队列。挑战在于它没有给我一个 50Hz 的频率。我希望每次调用都能得到大约 5 个值,但它会给我突发的数据(即有时什么都没有,有时是 15 个)。因此,我将 BlockingCollection 作为发送计时器从中获取数据的缓冲区。发送定时器使用递归的方法来获取列表中间的数据。它检查是否发送了该值,如果已经是(50Hz -> 10Hz),则获取下一个第 5 个值,如果没有发送该值。现在来解决问题;即使我锁定了对象,递归方法有时也会两次发送相同的值。我们已将问题定位为锁定未按预期工作,但我们不知道为什么或如何解决它。

有什么建议么?附加的代码不是真正的代码,但它说明了问题,实际上给你重复的值,虽然不是很频繁。

class Program {

    private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>();
    private const int Interval = 5;

    private Timer inTimer;
    private Timer outTimer;

    static void Main() {
        Program program = new Program();
        program.Run();
        Console.ReadLine();
    }

    private void Run() {
        _pipes[100] = new BlockingCollection<TestObject>();
        _pipes[200] = new BlockingCollection<TestObject>();
        _pipes[300] = new BlockingCollection<TestObject>();

        inTimer = new Timer(InTimer, null, 0, 100);
        Thread.Sleep(1000);
        outTimer = new Timer(OutTimer, null, 0, 200);
    }

    private void OutTimer(object state) {
        foreach (TestObject testObject in GetNotSentTestObjects()) {
            Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value);
        }
    }

    private IEnumerable<TestObject> GetNotSentTestObjects() {
        List<TestObject> testObjects = new List<TestObject>();

        foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
            TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count / 2));
            if (testObject == null) {
                return null;
            }
            testObjects.Add(testObject);
        }
        return testObjects;
    }

    private TestObject GetLatestTestObjectNotSent(int key, int locationInList) {

        BlockingCollection<TestObject> pipe = _pipes[key];
        TestObject testObject;
        lock (pipe) {
            testObject = pipe.ElementAt(locationInList - 1);

            if (testObject.Sent) {
                int nextLocationInList = locationInList + Interval;
                GetLatestTestObjectNotSent(key, nextLocationInList);
            }
            testObject.Sent = true;
        }
        testObject.PipeId = key;
        return testObject;
    }

    private void InTimer(object sender) {
        Random random = new Random();
        for (int i = 0; i < random.Next(0,20); i++) {
            foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
                pipe.Value.Add(new TestObject());
            }
            i++;
        }
    }
}

internal class TestObject {
    public DateTime Timestamp { get; set; }
    public string Value { get; set; }
    public bool Sent { get; set; }
    public int PipeId;

    public TestObject() {
        Value = Guid.NewGuid().ToString().Substring(0, 8);
        Timestamp = DateTime.Now;
    }
}

注意, Thread.Sleep 是让列表被填充。在我的代码中,我也有一段时间后修剪列表的代码。

4

3 回答 3

1

ABlockingCollection是一个队列。它不是为随机访问而设计的,尝试随机访问它会给你带来无穷无尽的麻烦。您也没有从队列中删除项目,这意味着最终您将耗尽内存。

您可能遇到的一个问题是,虽然您在使用 访问列表时锁定了列表,ElementAt但在添加项目时并没有锁定它。但是锁定并发数据结构的整个想法应该让你重新考虑你的设计。您永远不必锁定并发数据结构。

据我了解,您想挑选每 5 个尚未处理的项目。如果您不想丢弃未处理的项目(即项目 1-4),这将是一个相当困难的问题。如果您要丢弃这些项目,那么只需调用Take五次即可:

TestObject o;
for (int i = 0; i < 5; ++i)
{
    o = pipe.Take();
}
// You now have the 5th item from the queue
// The other items have been discarded

如果您想保留这些项目以供以后可能的处理,您必须以某种方式将它们添加回队列。但是它们会排到队列的末尾,这意味着您将在处理较旧的项目之前处理较新的项目。

此外,在某些时候你会填满队列,因为生产者添加东西的速度比消耗它们的速度要快。

您需要更多地考虑您希望此应用程序如何工作,或者更好地向我们解释它。您所描述的内容和您发布的代码......令人困惑,我们无法提供好的建议。

于 2013-03-18T23:56:19.343 回答
0

经过大量的抨击,事实证明,当我使用递归方法时,我需要返回......

于 2013-03-19T08:59:44.917 回答
0

同一个线程可以多次获取同一个锁。看这个例子

object o = new object();
lock (o) lock (o) Console.WriteLine("Acquired lock two times");

您应该使用其他同步原语,如 Mutex、Semaphore、AutoResetEvent 等...

于 2013-03-18T20:33:26.913 回答