摘要:我正在尝试利用 jet 管道进行高负载聚合。我发现绝大多数运行时间都是通过字节流对我的地图记录进行序列化和反序列化(它们实现 DataSerializable)。我认为将聚合操作分发到节点的目的是它们可以直接访问堆中的分布式成员。
管道:
pipeline.drawFrom(source)
.aggregate(aggregate)
.drainTo(sink);
资源:
Sources.<Key, Record>map("mapname")
总计的:
AggregateOperation1<Entry<Key, Record>, T, Result>=
AggregateOperation
.withCreate(() -> {
Accumulator a = new Accumulator(this);
a.initialize();
return a;
}).<Entry<Key, Record>>andAccumulate(
(acc, row) -> acc.apply(row))
.andCombine(
(left, right) -> left.combine(right))
.andFinish(acc -> acc.finish());
我发现在执行管道作业时,源是在地图中读取/写入记录:
Record.readData
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:158)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:105)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:50)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:191)
at com.hazelcast.query.impl.CachedQueryEntry.getValue(CachedQueryEntry.java:75)
at Accumulator.apply(Accumulator.java:102)
触发此操作的 Accumulator.apply 中的代码(来自“andAccumulate”步骤)看起来像
private void apply(Entry<IntArr, Record> entry) {
Record record = entry.getValue();
...
}
如何创建一个 Jet 地图数据源,将地图本地条目或值提供给 AggregateOperation 在每个节点和线程上累积调用而不调用序列化?我正在做一些具体的事情来让它以这种方式表现吗?我相信集群设置为使用备份记录进行聚合;那会导致这个吗?(我还没有确认每条记录都发生这种情况)
编辑:这是我当前的 MapConfig:
MapConfig mapConfig = new MapConfig(mapName)
.setStatisticsEnabled(true)
.setReadBackupData(true)
.setInMemoryFormat(InMemoryFormat.OBJECT);
据我了解http://docs.hazelcast.org/docs/3.10.4/manual/html-single/index.html#setting-in-memory-format InMemoryFormat.OBJECT 应该指示 IMap 将值保留在它们的目标(反序列化)形式。