1

我想使用 hazelcast 或 coherence EntryProcessor 在密钥存储在缓存中的不同节点上处理一些并行执行的逻辑。我看到我可以使用类似 sendToEachKey(EntryProcessor 进程)的东西。

当我还需要使用逻辑发送一条属于另一个系统的要处理的数据并且我接收到它(例如在 http 请求中)时,我的问题就出现了。

当然我可以做类似 sendToEachKey(EntryProcessor(data) process) 的事情。但是,如果数据与每个键不同,并且我只想将他的数据发送到特定键以进行处理,我该怎么做?我之所以要这样做是因为数据太大而且我的网络过载。

当然,如果我打开一个线程池将每个数据发送到每个键是可能的,但由于请求量很大,它效率低下。

谢谢!

4

3 回答 3

1

对于 Hazelcast,您可以检索所有值并发送它自己的每个键EntryProcessor,但这会产生很多开销。

另一种选择是结合使用EntryProcessor和我们的分布式ExecutorService.

你发送一个Runnable到 ExecutorService。在内部Runnable检索本地键集,检索所有外部值(所有已经在节点本地的值),然后为EntryProcessor每个本地键发出一个。由于您已经在节点本地,因此没有更多的流量飞来飞去(除了备份,显然:))。也就是说,您可能希望实现一个EntryProcessor仅传输更改值而不传输整个处理器本身的特定对象(以节省更多流量)。

于 2016-10-18T07:39:49.740 回答
1

在 Coherence 中,您可以使用PartitionedService来查找缓存键与集群成员的关联。然后,您可以使用每个成员的数据调用入口处理器,PartitionedFilter以确保仅将数据发送给该成员。像这样的东西:

// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) { 
    PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
    Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);

    for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
        Member member = dataForMember.getKey();
        Map<String, Data> data = dataForMember.getValue();

        PartitionSet partitions = partitionedService.getOwnedPartitions(member);
        PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
        EntryProcessor processor = new MyEntryProcessor(data);
        cache.async().invokeAll(filter, processor);
    }
}

Map<Member, Map<String, Data>> splitDataByMembers(
        PartitionedService partitionedService,
        Map<String, Data> externalData) {
    Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();

    for (Object member : partitionedService.getInfo().getServiceMembers()) {
        dataForMembers.put((Member) member, new HashMap<>());
    }
    for (Entry<String, Data> dataForKey : externalData.entrySet()) {
        Member member = partitionedService.getKeyOwner(dataForKey.getKey());
        dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
    }
    return dataForMembers;
}

这样,集群中的每个成员将只有一个入口处理器调用,并且每个成员将只获得它感兴趣的数据。

我用作与此键关联的数据String的缓存键和任意Data类型,但您当然可以使用任何其他类型(并且您根本不必将外部数据建模为映射)。

于 2016-10-19T10:47:32.483 回答
0

在 Hazelcast 中,你会这样做executeOnKeys(keys, new EntryProcessor(data)),这太多了,因为数据太大了。

为什么不

executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));

发送每个键需要的数据子集?

于 2016-10-18T07:37:48.357 回答