1

我是 Hazelcast 的新手,我正在尝试使用它来将数据存储在一张太大而无法容纳在单台机器上的地图中。

我需要实现的过程之一是遍历地图中的每个值并对它们做一些事情 - 不累积或聚合,我不需要一次查看所有数据,因此没有内存问题接着就,随即。

我的简单实现是使用IMap.keySet()然后遍历所有键以依次获取每个存储的值(并允许在处理后对值进行 GC),但我担心系统中会有太多数据即使只是获取密钥列表也足以给系统带来过度的压力。

我希望有一个流式 API,我可以流式传输密钥(甚至完整条目),这样本地节点就不必在本地缓存整个集合 - 但在文档。

如果您提出任何建议,我将不胜感激。谢谢。

4

2 回答 2

1

Hazelcast Jet 提供分布式版本j.u.s并向IMap. 它允许在 Hazelcast 集群上执行 Java Streams API。

import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.stream.DistributedCollectors;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.jet.stream.IStreamList;

import static com.hazelcast.jet.stream.DistributedCollectors.toIList;

    final IStreamMap<String, Integer> streamMap = instance1.getMap("source");
    // stream of entries, you can grab keys from it
    IStreamList<String> counts = streamMap.stream()
                    .map(entry -> entry.getKey().toLowerCase())
                    .filter(key -> key.length() >= 5)
                    .sorted()
                    // this will store the result on cluster as well
                    // so there is no data movement between client and cluster
                    .collect(toIList());

请在此处找到有关 jet 的更多信息,并此处找到更多示例。

干杯,维克

于 2017-02-15T13:39:12.823 回答
0

虽然 Hazelcast Jet 流的实现看起来令人印象深刻,但我没有太多时间投资于升级到 Hazelcast Jet(在我们几乎标准的 vert.x 设置中)。相反,我使用IMap.executeOnEntries的似乎与@Vik Gamov 为 Hazelcast Jet 详述的内容大致相同,只是语法更烦人。

我的例子:

myMap.executeOnEntries(new EntryProcessor<String,MyEntity>(){
    private static final long serialVersionUID = 1L;
    @Override
    public Object process(Entry<String, MyEntity> entry) {
        entry.getValue().fondle();
        return null;
    }
    @Override
    public EntryBackupProcessor<String, MyEntity> getBackupProcessor() {
        return null;
    }});

如您所见,语法很烦人:

  1. 我们需要创建一个可以序列化到集群的实际对象——这里没有花哨的 lambdas(不要使用我的序列号,如果你复制并粘贴它——它被设计破坏了)。
  2. 它不能是 lambda 的一个原因是接口不起作用 - 您需要另一种方法来处理备份副本(或者至少声明您不想像我一样处理它们),虽然我承认它的重要性,它一直都不重要,我猜它只在极少数情况下才重要。
  3. 显然你不能(或者至少它不是微不足道的)从进程中返回数据——这在我的情况下并不重要,但仍然很重要。
于 2017-03-09T08:52:51.363 回答