0

我想问你一个关于我们实际面临的架构问题的问题。

  • 我们有一个分布在多个服务器上的应用程序
  • 我们通过activeMQ消费数据。
  • 我们有一个名为 Price 的对象需要保持同步。
  • 这个对象可以被多次接收。
  • 价格有一个名为“lastUpdate”的字段,我们需要使用最新的“lastUpdate”存储/更新价格。
  • 不保证最后收到的价格比上一个价格具有最新的 lastUpdate。

为了让一切保持同步,我们有 3 种方法(我们必须使用所有这些方法)

  • 我们可以接收价格的异步更新/插入(使用监听器)
  • 每 15 分钟,我们将要求生产者将过去 15 分钟内更改的所有价格发送给我们(计划任务)
  • 我们每 1 小时更新一次所有价格(schduledTask)

使用的技术:

  • ActiveMQ 作为总线
  • Hazelcast 作为分布式地图在插入之前进行同步
  • JPA

现在如何工作:每次添加/更新异步价格时,我们都会执行:

这就是我们要更新/保留的每个价格所做的事情(单个和批量更新/插入方法都调用这个)

private void priceAddedOrUpdated(Price retrievedPrice)
{
    Date dateInTheMap = hazelcastPriceMap.get(retrievedPrice.getId());
    if(retrievedPrice.getLastUpdate>dateInTheMap(
    {
        //doInTransacion
        try{
            hazelcastPriceMap.lock(retrievedPrice.getId())
            //do some logic including
            persist the price
            hazelcastPriceMap.put(retrievedPrice.getId(),retrievedPrice.getLastUpdate())
        }
        finally{
             //release the lock
        }
    }
}

问题是任务需要很长时间(30/40 秒)才能完成,而我们希望在 4/5 秒内完成(平均处理 10 万个价格)。我们使用的逻辑似乎没有任何方法可以提高性能。所以我认为我们需要改变保持数据同步的方式......有什么建议吗?

编辑:

按照 pveentjer 的建议,我扩展了 EntryProcessor 类,以便可以在构造函数中传递要更新的价格地图:

    public class PriceEntryProcessor implements EntryProcessor, EntryBackupProcessor, Serializable {

    Map<Long, Price> priceMap;

    public PriceEntryProcessor(Map<Long, Price> priceMap)
    {
        super();
        this.priceMap = priceMap;
    }
    public Object process(Map.Entry entry)
    {//get the price from the map for the entry and do the logic/db insertion

}

我看到在 EntryProcessor 中我们可以只发送键值。但是我们只使用 price.getId() 作为键。

4

1 回答 1

0

一种潜在的加速可能是使用 EntryProcessor。

在 EntryProcessor 中,您可以免费获得锁定,因为您可以保证没有其他进程在同一分区中运行。您还移动到 EntryProcessor 的数据库逻辑。这会将远程处理减少一半。

你在做批处理吗?因为这可能会产生巨大的差异。因此,单个分区的批处理(例如 100 次更改)一旦收到,就使用入口处理器一次性处理整个批处理。

如果没有批处理,您将有 100 x(1 个锁定、1 个解锁、1 个获取、1 个放置)=400 个操作。如果批处理 100,您将拥有 1 个远程操作。所以减少了 400 倍(忽略复制)。

于 2015-11-08T07:53:43.460 回答