我在消息计数器上具有相当高的吞吐量(每秒数万),并且正在寻找一种有效的方法来获取计数,而无需在任何地方放置锁,或者理想情况下,当我每 10 秒进行一次更新时,不锁定每个消息计数。
使用不可变计数器对象
我正在使用一个不可变的计数器类:
public class Counter
{
public Counter(int quotes, int trades)
{
Quotes = quotes;
Trades = trades;
}
readonly public int Quotes;
readonly public int Trades;
// and some other counter fields snipped
}
并且会在每个消息处理循环上更新它:
class MyProcessor
{
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
this.Counter = new Counter(0,0);
});
}
public void ProcessMessages(Messages messages)
{
foreach(var message in messages) { /* */ }
var oldCounter = counter;
this.counter = new Counter(oldCounter.Quotes, oldCounter.Trades);
}
}
我有很多柜台(未全部显示),因此意味着Interlocked.Increment
在各个柜台字段上有很多单独的电话。
我能想到的唯一另一种方法是锁定每一次运行ProcessMessages
(这将是广泛的)并且对于作为实用程序的东西而不是在程序崩溃的关键位置上很重。
当我们只需要每 10 秒更新一次时,是否可以在没有硬互锁/线程机制的情况下以这种方式使用不可变计数器对象?
标记检查想法以避免锁定
计时器线程是否可以设置一个标志以供ProcessMessages
检查,如果它看到它设置,则再次从零开始计数,即
/* snipped the MyProcessor class, same as before */
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
ManualResetEvent reset = new ManualResetEvent(false);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
// log
this.reset.Set();
});
}
// this should be called every second with a heartbeat message posted to queue
public void ProcessMessages(Messages messages)
{
if (reset.WaitOne(0) == true)
{
this.counter = new Counter(this.counter.Quotes, this.counter.Trades, this.counter.Aggregates);
reset.Reset();
}
else
{
this.counter = new Counter(
this.counter.Quotes + message.Quotes.Count,
this.counter.Trades + message.Trades.Count);
}
}
/* end of MyProcessor class */
这会起作用,但是当进程消息停止时更新“停止”(尽管吞吐量非常高,但它确实会在晚上暂停几个小时,理想情况下应该显示实际值而不是最后一个值)。
MyProcessor.ProcessMessages()
解决此问题的一种方法是每秒发布一条心跳消息,以强制在reset
设置 ManualResetEvent 时对消息计数器进行内部更新和随后的重置。