我正在从历史数据库接收大约 50 Hz 的数据。我想以 10 Hz 左右的频率将其从另一端流出。为了实现这一点,我生成了两个计时器,一个用于从历史数据库中获取数据(我运行它的速度是发送计时器的两倍)。第二个定时器为 200 毫秒(10 赫兹)。第一个计时器将它获得的每个值都存储在 BlockingCollection 中(我也尝试了 ConcurrentQueue)。由于两个不同的线程读取/写入集合,我无法使用普通的列表/队列。挑战在于它没有给我一个 50Hz 的频率。我希望每次调用都能得到大约 5 个值,但它会给我突发的数据(即有时什么都没有,有时是 15 个)。因此,我将 BlockingCollection 作为发送计时器从中获取数据的缓冲区。发送定时器使用递归的方法来获取列表中间的数据。它检查是否发送了该值,如果已经是(50Hz -> 10Hz),则获取下一个第 5 个值,如果没有发送该值。现在来解决问题;即使我锁定了对象,递归方法有时也会两次发送相同的值。我们已将问题定位为锁定未按预期工作,但我们不知道为什么或如何解决它。
有什么建议么?附加的代码不是真正的代码,但它说明了问题,实际上给你重复的值,虽然不是很频繁。
class Program {
private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>();
private const int Interval = 5;
private Timer inTimer;
private Timer outTimer;
static void Main() {
Program program = new Program();
program.Run();
Console.ReadLine();
}
private void Run() {
_pipes[100] = new BlockingCollection<TestObject>();
_pipes[200] = new BlockingCollection<TestObject>();
_pipes[300] = new BlockingCollection<TestObject>();
inTimer = new Timer(InTimer, null, 0, 100);
Thread.Sleep(1000);
outTimer = new Timer(OutTimer, null, 0, 200);
}
private void OutTimer(object state) {
foreach (TestObject testObject in GetNotSentTestObjects()) {
Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value);
}
}
private IEnumerable<TestObject> GetNotSentTestObjects() {
List<TestObject> testObjects = new List<TestObject>();
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count / 2));
if (testObject == null) {
return null;
}
testObjects.Add(testObject);
}
return testObjects;
}
private TestObject GetLatestTestObjectNotSent(int key, int locationInList) {
BlockingCollection<TestObject> pipe = _pipes[key];
TestObject testObject;
lock (pipe) {
testObject = pipe.ElementAt(locationInList - 1);
if (testObject.Sent) {
int nextLocationInList = locationInList + Interval;
GetLatestTestObjectNotSent(key, nextLocationInList);
}
testObject.Sent = true;
}
testObject.PipeId = key;
return testObject;
}
private void InTimer(object sender) {
Random random = new Random();
for (int i = 0; i < random.Next(0,20); i++) {
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
pipe.Value.Add(new TestObject());
}
i++;
}
}
}
internal class TestObject {
public DateTime Timestamp { get; set; }
public string Value { get; set; }
public bool Sent { get; set; }
public int PipeId;
public TestObject() {
Value = Guid.NewGuid().ToString().Substring(0, 8);
Timestamp = DateTime.Now;
}
}
注意, Thread.Sleep 是让列表被填充。在我的代码中,我也有一段时间后修剪列表的代码。