我想问你一个关于我们实际面临的架构问题的问题。
- 我们有一个分布在多个服务器上的应用程序
- 我们通过activeMQ消费数据。
- 我们有一个名为 Price 的对象需要保持同步。
- 这个对象可以被多次接收。
- 价格有一个名为“lastUpdate”的字段,我们需要使用最新的“lastUpdate”存储/更新价格。
- 不保证最后收到的价格比上一个价格具有最新的 lastUpdate。
为了让一切保持同步,我们有 3 种方法(我们必须使用所有这些方法)
- 我们可以接收价格的异步更新/插入(使用监听器)
- 每 15 分钟,我们将要求生产者将过去 15 分钟内更改的所有价格发送给我们(计划任务)
- 我们每 1 小时更新一次所有价格(schduledTask)
使用的技术:
- ActiveMQ 作为总线
- Hazelcast 作为分布式地图在插入之前进行同步
- JPA
现在如何工作:每次添加/更新异步价格时,我们都会执行:
这就是我们要更新/保留的每个价格所做的事情(单个和批量更新/插入方法都调用这个)
private void priceAddedOrUpdated(Price retrievedPrice)
{
Date dateInTheMap = hazelcastPriceMap.get(retrievedPrice.getId());
if(retrievedPrice.getLastUpdate>dateInTheMap(
{
//doInTransacion
try{
hazelcastPriceMap.lock(retrievedPrice.getId())
//do some logic including
persist the price
hazelcastPriceMap.put(retrievedPrice.getId(),retrievedPrice.getLastUpdate())
}
finally{
//release the lock
}
}
}
问题是任务需要很长时间(30/40 秒)才能完成,而我们希望在 4/5 秒内完成(平均处理 10 万个价格)。我们使用的逻辑似乎没有任何方法可以提高性能。所以我认为我们需要改变保持数据同步的方式......有什么建议吗?
编辑:
按照 pveentjer 的建议,我扩展了 EntryProcessor 类,以便可以在构造函数中传递要更新的价格地图:
public class PriceEntryProcessor implements EntryProcessor, EntryBackupProcessor, Serializable {
Map<Long, Price> priceMap;
public PriceEntryProcessor(Map<Long, Price> priceMap)
{
super();
this.priceMap = priceMap;
}
public Object process(Map.Entry entry)
{//get the price from the map for the entry and do the logic/db insertion
}
我看到在 EntryProcessor 中我们可以只发送键值。但是我们只使用 price.getId() 作为键。