2

作为我的 MS 项目的一部分,我正在编写一个金融市场数据分发应用程序。该应用程序即将完成,但该应用程序无法很好地扩展。

这就是它的工作原理,我订阅了一个“假”交易所来获取市场数据。订阅后,我得到一个初始快照,然后我不断收到增量......

1) Subscribe for IBM.
2) Snapshot : Buy:50|Size:1400||Sell:49|Size:1000
(At buy price of 50, 1400 shares and at sell price of 49, 1000 shares available)
3) Update1: -1|10||-2|25
(Buy price is now 49 (50-1), buy Size is 1410. Sell price is 47 and sell size is 1025)
4) Update2 ...
.
.
.

我有一个处理市场数据的类,下面的 dataUpdate() 是“假”交换应用程序在一个线程中调用的回调方法。

class MarketDataProcessor:
Map<String, MarketData> marketDataMap      = new HashMap<String, MarketData>(1000);
ConcurrentMap<String, MarketData> distMap  = new ConcurrentHashMap<String, MarketData>();

//**Always called by one thread (MarketDataThread)**
    public void dataUpdate( MarketDataUpdate dataUpdate ){

            try{
                 //IBM
                 String symbol   = dataUpdate .getSymbol();             
                 MarketData data = marketDataMap.get( symbol );

                 if ( data = null ){
                    data = new MarketData( dataUpdate );
                 }else{
                    //Apply the delta here and save it ...
                    marketDataMap.put( symbol, data );
                 }

                 distMap.put( symbol, data );

            }catch( Exception e ){
               LOGGER.warn("Exception while processing market data.");
               LOGGER.warn("Exception: {}", e);
            }
    }

从交易所获取市场数据后,我需要以线程安全的方式分发它。这是不能很好扩展的方法,因为它可以被 20 多个线程调用,并且它使用外部锁来确保原子性。

public final double[] getData( String symbol, Side side ){
     double[] dataArray = {0.0, 0.0};

     synchronized( LOCK ){
        MarketData data = distMap.get( symbol );
        dataArray       = ( side == BUY ) ? getBuyData(data) : getSellData(data); 
     }

  return dataArray;
}

这是我提出的解决方案是将上述方法一分为二。

//No external lock as it uses a ConcurrentHashMap
public final MarketData getData( String symbol ){
       return distMap.get( symbol );
}

//State of this method is now confimed to the
//stack of the calling thread, therefore thread safe.
public final double[] getData( MarketData data, Side side ){
       return ( side == BUY ) ? getBuyData(data) : getSellData(data); 
}

承认这会改变 api 并使用户调用两种方法而不是一种方法,如果不使用外部锁,它不是线程安全的吗?

谢谢你。

4

3 回答 3

2

将许多线程转储到一个需要时间的同步线程中并不是一个好主意。

您能否将这些结果转储到同步队列中以供稍后分析?单个线程可以将项目从队列中拉出,而许多线程将其放入。

于 2013-03-14T21:29:14.680 回答
1

首先,我不知道您的 LOCK 控制访问的资源是什么。你的代码的哪一部分是不安全的,为什么你认为使用 LOCK 有帮助?您的新方法也没有提供与原始方法相同的签名。您是否希望人们像使用它一样使用它

getData(getData(symbol), side)

?

如果没有锁的原始代码不是线程安全的,那么您的新方法与此没有什么不同。

回到你原来的实现:你在 LOCK 保护的代码段中访问的唯一资源是 distMap。没有其他地方正在使用LOCK。因此,您在这里所做的是您只需要 1 个线程从 distMap获取数据。但是我看不出这样做的理由:首先 distMap 是一个 ConcurrentMap,它是线程安全的,可以被多个线程访问。其次,对于每一种 Map,简单地获取数据总是线程安全的,因为没有人改变状态。通常你需要执行同步控制的应该包括状态改变逻辑,这是你在地图中放置数据的地方(尽管在这段特定的代码中,没有理由添加额外的同步控制),但你没有这样做. 我根本无法理解您的 LOCK 的全部目的。

对您的一些评论:只是盲目地使用同步/锁定等不会自动将您的代码更改为线程安全的。您需要知道什么不是线程安全的,并相应地进行同步控制。另一条评论是:将非原子方法分解为原子片段不会使您的代码成为线程安全的。换一种方式想一想:Java 中几乎所有最低级别的操作都是原子的。这是否意味着您的代码是自动线程安全的,因为您正在调用一堆原子操作?明显不是。

于 2013-03-15T03:09:25.363 回答
0

虽然其他答案提供了很好的建议,但还有另一个可用的选项是无锁的:使用持久映射(如来自 pcollections)。读取器线程将始终拥有一致的数据快照;编写器线程将创建地图的新版本,但不会复制所有数据。

于 2013-03-15T03:33:54.723 回答