作为我的 MS 项目的一部分,我正在编写一个金融市场数据分发应用程序。该应用程序即将完成,但该应用程序无法很好地扩展。
这就是它的工作原理,我订阅了一个“假”交易所来获取市场数据。订阅后,我得到一个初始快照,然后我不断收到增量......
1) Subscribe for IBM.
2) Snapshot : Buy:50|Size:1400||Sell:49|Size:1000
(At buy price of 50, 1400 shares and at sell price of 49, 1000 shares available)
3) Update1: -1|10||-2|25
(Buy price is now 49 (50-1), buy Size is 1410. Sell price is 47 and sell size is 1025)
4) Update2 ...
.
.
.
我有一个处理市场数据的类,下面的 dataUpdate() 是“假”交换应用程序在一个线程中调用的回调方法。
class MarketDataProcessor:
Map<String, MarketData> marketDataMap = new HashMap<String, MarketData>(1000);
ConcurrentMap<String, MarketData> distMap = new ConcurrentHashMap<String, MarketData>();
//**Always called by one thread (MarketDataThread)**
public void dataUpdate( MarketDataUpdate dataUpdate ){
try{
//IBM
String symbol = dataUpdate .getSymbol();
MarketData data = marketDataMap.get( symbol );
if ( data = null ){
data = new MarketData( dataUpdate );
}else{
//Apply the delta here and save it ...
marketDataMap.put( symbol, data );
}
distMap.put( symbol, data );
}catch( Exception e ){
LOGGER.warn("Exception while processing market data.");
LOGGER.warn("Exception: {}", e);
}
}
从交易所获取市场数据后,我需要以线程安全的方式分发它。这是不能很好扩展的方法,因为它可以被 20 多个线程调用,并且它使用外部锁来确保原子性。
public final double[] getData( String symbol, Side side ){
double[] dataArray = {0.0, 0.0};
synchronized( LOCK ){
MarketData data = distMap.get( symbol );
dataArray = ( side == BUY ) ? getBuyData(data) : getSellData(data);
}
return dataArray;
}
这是我提出的解决方案是将上述方法一分为二。
//No external lock as it uses a ConcurrentHashMap
public final MarketData getData( String symbol ){
return distMap.get( symbol );
}
//State of this method is now confimed to the
//stack of the calling thread, therefore thread safe.
public final double[] getData( MarketData data, Side side ){
return ( side == BUY ) ? getBuyData(data) : getSellData(data);
}
承认这会改变 api 并使用户调用两种方法而不是一种方法,如果不使用外部锁,它不是线程安全的吗?
谢谢你。