1

摘要:我正在尝试利用 jet 管道进行高负载聚合。我发现绝大多数运行时间都是通过字节流对我的地图记录进行序列化和反序列化(它们实现 DataSerializable)。我认为将聚合操作分发到节点的目的是它们可以直接访问堆中的分布式成员。

管道:

pipeline.drawFrom(source)
.aggregate(aggregate)
.drainTo(sink);

资源:

Sources.<Key, Record>map("mapname")

总计的:

AggregateOperation1<Entry<Key, Record>, T, Result>=
AggregateOperation
                .withCreate(() -> {
                    Accumulator a = new Accumulator(this);
                    a.initialize();
                    return a;
                }).<Entry<Key, Record>>andAccumulate(
                        (acc, row) -> acc.apply(row))
                .andCombine(
                        (left, right) -> left.combine(right))
                .andFinish(acc -> acc.finish());

我发现在执行管道作业时,源是在地图中读取/写入记录:

Record.readData
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:158)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:105)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:50)
        at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:191)
        at com.hazelcast.query.impl.CachedQueryEntry.getValue(CachedQueryEntry.java:75)
        at Accumulator.apply(Accumulator.java:102)

触发此操作的 Accumulator.apply 中的代码(来自“andAccumulate”步骤)看起来像

private void apply(Entry<IntArr, Record> entry) {
    Record record = entry.getValue();
    ...
}

如何创建一个 Jet 地图数据源,将地图本地条目或值提供给 AggregateOperation 在每个节点和线程上累积调用而不调用序列化?我正在做一些具体的事情来让它以这种方式表现吗?我相信集群设置为使用备份记录进行聚合;那会导致这个吗?(我还没有确认每条记录都发生这种情况)

编辑:这是我当前的 MapConfig:

MapConfig mapConfig = new MapConfig(mapName)
    .setStatisticsEnabled(true)
    .setReadBackupData(true)
    .setInMemoryFormat(InMemoryFormat.OBJECT);

据我了解http://docs.hazelcast.org/docs/3.10.4/manual/html-single/index.html#setting-in-memory-format InMemoryFormat.OBJECT 应该指示 IMap 将值保留在它们的目标(反序列化)形式。

4

1 回答 1

1
  1. Hazelcast IMap 以序列化形式存储数据。当您从源获取它时,您会得到一个Map.Entry实例,该实例将在请求时延迟反序列化其键/值。这发生在您的entry.getValue()通话中。

  2. 您的管道要求数据聚合:输出是反映所有输入数据的单个项目。为了达到这个结果,Jet 必须将所有部分结果发送给单个成员,在该成员中调用您的Accumulator.combine方法来组合它们。与上述步骤相比,此步骤的 ser/de 影响应该可以忽略不计。

于 2018-08-28T07:39:06.380 回答