作为 Hazelcast Jet 的新手,我试图建立一个设置,其中来自无限来源的单个项目(即用户请求的地图日志)针对(可能变化的和)巨大的参考项目地图进行 MapReduced。
具体来说,对于这个例子,我想在给定使用定义的输入向量(查询)的情况下确定向量映射(参考float[]
)中最小欧几里德距离的向量(读取:)的 ID 。
如果在单台机器上天真地实现,这将遍历引用的 Map 项并确定每个项到查询的欧几里德距离,同时保持 k 最小匹配,其中输入来自用户请求( HTTP POST
、按钮单击等),计算完成后结果集立即可用。
我最近的方法是:
- 在地图日志上收听请求
.distributed().broadcast()
对映射作业的请求- 让映射作业获得
.localKeySet()
参考向量的 - 发出 k 最小向量的 ID(按欧几里得距离)
.partitioned(item -> item.requestId)
通过分区减少/收集单个节点上的结果- 将结果存储到客户端具有关键侦听器的地图中。
从概念上讲,这里的每个查询都是一批大小1
,我实际上是在批量处理的时候。但是,我很难让映射器和化简器知道批次何时完成,以便收集器知道它们何时完成(以便他们可以发出最终结果)。
我尝试使用带有真实和虚假时间戳的水印(通过AtomicLong
实例自动获得)并从tryProcessWm
函数中发出,但这似乎是一个非常脆弱的解决方案,因为一些事件被丢弃了。我还需要确保没有两个请求是交错的(即在请求 ID 上使用分区),但同时让映射器在所有节点上运行......
我将如何攻击这个任务?
编辑#1:
现在,我的映射器看起来像这样:
private static class EuclideanDistanceMapP extends AbstractProcessor {
private IMap<Long, float[]> referenceVectors;
final ScoreComparator comparator = new ScoreComparator();
@Override
protected void init(@Nonnull Context context) throws Exception {
this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
super.init(context);
}
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, float[]> query = (Tuple3<Long, Long, float[]>)item;
final long requestId = query.f0();
final long timestamp = query.f1();
final float[] queryVector = query.f2();
final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
for (Long vectorKey : referenceVectors.localKeySet()) {
float[] referenceVector = referenceVectors.get(vectorKey);
float distance = 0.0f;
for (int i = 0; i < queryVector.length; ++i) {
distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
}
final Tuple2<Long, Float> score = Tuple2.tuple2(vectorKey, (float) Math.sqrt(distance));
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray()));
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}
reducer 本质上是在复制它(当然,减去向量计算)。
编辑#2:
这是 DAG 设置。当前,当有多个并发请求时,它会失败。大多数项目由于水印而被丢弃。
DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
SourceProcessors.<Long, float[], Tuple2<Long, float[]>>streamMapP(QUERY_VECTOR_MAP_NAME,
e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
e -> Tuple2.tuple2(e.getKey(), e.getNewValue()),true));
// simple map() using an AtomicLong to create the timestamp
Vertex addTimestamps = dag.newVertex("addTimestamps", AddTimestampMapP::new);
// the class shown above.
Vertex map = dag.newVertex("map", EuclideanDistanceMapP::new);
Vertex insertWatermarks = dag.newVertex("insertWatermarks",
insertWatermarksP((Tuple3<Long, Long, float[]> t) -> t.f1(), withFixedLag(0), emitByMinStep(1)));
Vertex combine = dag.newVertex("combine", CombineP::new);
// simple map() that drops the timestamp
Vertex removeTimestamps = dag.newVertex("removeTimestamps", RemoveTimestampMapP::new);
// Using a list here for testing.
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP(SINK_NAME));
dag.edge(between(sourceStream, addTimestamps))
.edge(between(addTimestamps, map.localParallelism(1))
.broadcast()
.distributed())
.edge(between(map, insertWatermarks).isolated())
.edge(between(insertWatermarks, combine.localParallelism(1))
.distributed()
.partitioned((Tuple2<Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(combine, removeTimestamps)
.partitioned((Tuple3<Long, Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(removeTimestamps, sink.localParallelism(1)));
编辑#3:
这是我当前的组合器实现。我假设所有物品都会根据水印订购;或者一般来说,相同请求的项目将由相同的组合器实例收集。虽然这似乎不是真的......
private static class CombineP extends AbstractProcessor {
private final ScoreComparator comparator = new ScoreComparator();
private final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
private Long requestId;
private Long timestamp = -1L;
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, Tuple2<Long, Float>[]> itemTuple = (Tuple3<Long, Long, Tuple2<Long, Float>[]>)item;
requestId = itemTuple.f0();
final long currentTimestamp = itemTuple.f1();
if (currentTimestamp > timestamp) {
buffer.clear();
}
timestamp = currentTimestamp;
final Object[] scores = itemTuple.f2();
for (Object scoreObj : scores) {
final Tuple2<Long, Float> score = (Tuple2<Long, Float>)scoreObj;
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return true;
}
@Override
protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm) {
// return super.tryProcessWm(ordinal, wm);
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray())) && super.tryProcessWm(ordinal, wm);
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}