2

作为 Hazelcast Jet 的新手,我试图建立一个设置,其中来自无限来源的单个项目(即用户请求的地图日志)针对(可能变化的和)巨大的参考项目地图进行 MapReduced。

具体来说,对于这个例子,我想在给定使用定义的输入向量(查询)的情况下确定向量映射(参考float[])中最小欧几里德距离的向量(读取:)的 ID 。

如果在单台机器上天真地实现,这将遍历引用的 Map 项并确定每个项到查询的欧几里德距离,同时保持 k 最小匹配,其中输入来自用户请求( HTTP POST、按钮单击等),计算完成后结果集立即可用。

我最近的方法是:

  • 在地图日志上收听请求
  • .distributed().broadcast()对映射作业的请求
  • 让映射作业获得.localKeySet()参考向量的
  • 发出 k 最小向量的 ID(按欧几里得距离)
  • .partitioned(item -> item.requestId)通过分区减少/收集单个节点上的结果
  • 将结果存储到客户端具有关键侦听器的地图中。

从概念上讲,这里的每个查询都是一批大小1,我实际上是在批量处理的时候。但是,我很难让映射器和化简器知道批次何时完成,以便收集器知道它们何时完成(以便他们可以发出最终结果)。

我尝试使用带有真实和虚假时间戳的水印(通过AtomicLong实例自动获得)并从tryProcessWm函数中发出,但这似乎是一个非常脆弱的解决方案,因为一些事件被丢弃了。我还需要确保没有两个请求是交错的(即在请求 ID 上使用分区),但同时让映射器在所有节点上运行......

我将如何攻击这个任务?


编辑#1:

现在,我的映射器看起来像这样:

private static class EuclideanDistanceMapP extends AbstractProcessor {
    private IMap<Long, float[]> referenceVectors;

    final ScoreComparator comparator = new ScoreComparator();

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
        super.init(context);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        final Tuple3<Long, Long, float[]> query = (Tuple3<Long, Long, float[]>)item;
        final long requestId = query.f0();
        final long timestamp = query.f1();
        final float[] queryVector = query.f2();

        final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
        for (Long vectorKey : referenceVectors.localKeySet()) {
            float[] referenceVector = referenceVectors.get(vectorKey);
            float distance = 0.0f;

            for (int i = 0; i < queryVector.length; ++i) {
                distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
            }

            final Tuple2<Long, Float> score = Tuple2.tuple2(vectorKey, (float) Math.sqrt(distance));
            if (buffer.size() < MAX_RESULTS) {
                buffer.add(score);
                continue;
            }

            // If the value is larger than the largest entry, discard it.
            if (comparator.compare(score, buffer.last()) >= 0) {
                continue;
            }

            // Otherwise we remove the largest entry after adding the new one.
            buffer.add(score);
            buffer.pollLast();
        }

        return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray()));
    }

    private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
        @Override
        public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
            return Float.compare(a.f1(), b.f1());
        }
    }
}

reducer 本质上是在复制它(当然,减去向量计算)。


编辑#2:

这是 DAG 设置。当前,当有多个并发请求时,它会失败。大多数项目由于水印而被丢弃。

DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
    SourceProcessors.<Long, float[], Tuple2<Long, float[]>>streamMapP(QUERY_VECTOR_MAP_NAME,
            e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
            e -> Tuple2.tuple2(e.getKey(), e.getNewValue()),true));

// simple map() using an AtomicLong to create the timestamp    
Vertex addTimestamps = dag.newVertex("addTimestamps", AddTimestampMapP::new);

// the class shown above.
Vertex map = dag.newVertex("map", EuclideanDistanceMapP::new);

Vertex insertWatermarks = dag.newVertex("insertWatermarks",
        insertWatermarksP((Tuple3<Long, Long, float[]> t) -> t.f1(), withFixedLag(0), emitByMinStep(1)));

Vertex combine = dag.newVertex("combine", CombineP::new);

// simple map() that drops the timestamp
Vertex removeTimestamps = dag.newVertex("removeTimestamps", RemoveTimestampMapP::new);

// Using a list here for testing.
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP(SINK_NAME));

dag.edge(between(sourceStream, addTimestamps))
    .edge(between(addTimestamps, map.localParallelism(1))
        .broadcast()
        .distributed())
    .edge(between(map, insertWatermarks).isolated())
    .edge(between(insertWatermarks, combine.localParallelism(1))
            .distributed()
            .partitioned((Tuple2<Long, Tuple2<Long, Float>[]> item) -> item.f0()))
    .edge(between(combine, removeTimestamps)
            .partitioned((Tuple3<Long, Long, Tuple2<Long, Float>[]> item) -> item.f0()))
    .edge(between(removeTimestamps, sink.localParallelism(1)));

编辑#3:

这是我当前的组合器实现。我假设所有物品都会根据水印订购;或者一般来说,相同请求的项目将由相同的组合器实例收集。虽然这似乎不是真的......

private static class CombineP extends AbstractProcessor {
    private final ScoreComparator comparator = new ScoreComparator();
    private final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
    private Long requestId;
    private Long timestamp = -1L;

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        final Tuple3<Long, Long, Tuple2<Long, Float>[]> itemTuple = (Tuple3<Long, Long, Tuple2<Long, Float>[]>)item;
        requestId = itemTuple.f0();

        final long currentTimestamp = itemTuple.f1();
        if (currentTimestamp > timestamp) {
            buffer.clear();
        }
        timestamp = currentTimestamp;

        final Object[] scores = itemTuple.f2();

        for (Object scoreObj : scores) {
            final Tuple2<Long, Float> score = (Tuple2<Long, Float>)scoreObj;

            if (buffer.size() < MAX_RESULTS) {
                buffer.add(score);
                continue;
            }

            // If the value is larger than the largest entry, discard it.
            if (comparator.compare(score, buffer.last()) >= 0) {
                continue;
            }

            // Otherwise we remove the largest entry after adding the new one.
            buffer.add(score);
            buffer.pollLast();
        }

        return true;
    }

    @Override
    protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm) {
        // return super.tryProcessWm(ordinal, wm);
        return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray())) && super.tryProcessWm(ordinal, wm);
    }

    private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
        @Override
        public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
            return Float.compare(a.f1(), b.f1());
        }
    }
}
4

1 回答 1

1

您必须始终记住,两个顶点之间的项目可以重新排序。当您有并行请求时,它们的中间结果可以交错在CombineP.

CombineP中,您可以依靠中间结果的数量等于集群中的成员数量这一事实。init从中计算参与成员的数量globalParallelism / localParallelism。当您收到此数量的中间体时,您可以发出最终结果。

另一个技巧可能是在每个成员上并行运行多个请求。您可以通过使用两个边缘来实现这一点: 1. 广播+分布式边缘到并行度=1 处理器 2. 单播边缘到并行度=N 处理器

另请注意,localKeys不适合巨大的地图:查询大小是有限的。

这是上面的代码。代码适用于 Jet 0.5:

DAG:

DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
        streamMapP(QUERY_VECTOR_MAP_NAME,
                e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
                e -> entry(e.getKey(), e.getNewValue()),true));

Vertex identity = dag.newVertex("identity", mapP(identity()))
        .localParallelism(1);
Vertex map = dag.newVertex("map", peekOutputP(EuclideanDistanceMapP::new));
Vertex combine = dag.newVertex("combine", peekOutputP(new CombineMetaSupplier()));
Vertex sink = dag.newVertex("sink", writeListP(SINK_NAME));

dag.edge(between(sourceStream, identity)
           .broadcast()
           .distributed())
   .edge(between(identity, map))
   .edge(between(map, combine)
           .distributed()
           .partitioned((Entry item) -> item.getKey()))
   .edge(between(combine, sink));

EuclideanDistanceMapP 类:

private static class EuclideanDistanceMapP extends AbstractProcessor {

    private IMap<Long, float[]> referenceVectors;
    final ScoreComparator comparator = new ScoreComparator();
    private Object pendingItem;

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
        super.init(context);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        if (pendingItem == null) {
            final Entry<Long, float[]> query = (Entry<Long, float[]>) item;
            final long requestId = query.getKey();
            final float[] queryVector = query.getValue();

            final PriorityQueue<Entry<Long, Float>> buffer = new PriorityQueue<>(comparator.reversed());
            for (Long vectorKey : referenceVectors.localKeySet()) {
                float[] referenceVector = referenceVectors.get(vectorKey);
                float distance = 0.0f;

                for (int i = 0; i < queryVector.length; ++i) {
                    distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
                }

                final Entry<Long, Float> score = entry(vectorKey, (float) Math.sqrt(distance));
                if (buffer.size() < MAX_RESULTS || comparator.compare(score, buffer.peek()) < 0) {
                    if (buffer.size() == MAX_RESULTS)
                        buffer.remove();
                    buffer.add(score);
                }
            }
            pendingItem = entry(requestId, buffer.toArray(new Entry[0]));
        }
        if (tryEmit(pendingItem)) {
            pendingItem = null;
            return true;
        }
        return false;
    }
}

组合P类:

private static class CombineP extends AbstractProcessor {
    private final ScoreComparator comparator = new ScoreComparator();
    private final Map<Long, PriorityQueue<Entry<Long, Float>>> buffer = new HashMap<>();
    private final Map<Long, Integer> accumulatedCount = new HashMap<>();
    private final int upstreamMemberCount;
    private Entry<Long, Entry<Long, Float>[]> pendingItem;

    private CombineP(int upstreamMemberCount) {
        this.upstreamMemberCount = upstreamMemberCount;
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        if (pendingItem == null) {
            final Entry<Long, Entry<Long, Float>[]> localValue = (Entry<Long, Entry<Long, Float>[]>) item;
            long requestId = localValue.getKey();
            PriorityQueue<Entry<Long, Float>> globalValue = buffer.computeIfAbsent(requestId, key -> new PriorityQueue<>(comparator.reversed()));
            globalValue.addAll(asList(localValue.getValue()));
            while (globalValue.size() > MAX_RESULTS) {
                globalValue.remove();
            }
            int count = accumulatedCount.merge(requestId, 1, Integer::sum);
            if (count == upstreamMemberCount) {
                // we've received enough local values, let's emit and remove the accumulator
                pendingItem = entry(requestId, globalValue.toArray(new Entry[0]));
                Arrays.sort(pendingItem.getValue(), comparator);
                buffer.remove(requestId);
                accumulatedCount.remove(requestId);
            } else {
                return true;
            }
        }
        if (tryEmit(pendingItem)) {
            pendingItem = null;
            return true;
        }
        return false;
    }
}

您还需要自定义元供应商CombineP

private static class CombineMetaSupplier implements ProcessorMetaSupplier {
    private int upstreamMemberCount;

    @Override
    public void init(@Nonnull Context context) {
        upstreamMemberCount = context.totalParallelism() / context.localParallelism();
    }

    @Nonnull
    @Override
    public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
        return address -> ProcessorSupplier.of(() -> new CombineP(upstreamMemberCount));
    }
}
于 2018-01-16T15:28:55.700 回答