0

我的用例需要读取来自 Kafka 主题的消息,并在消息发布到 Kafka 时按自然顺序处理这些消息。

Kafka生产者负责发布在单个kafka topic-partition中排序的每组消息,我需要在同一个Vertex-Processor中以相同的顺序处理每组消息。

在此处输入图像描述

上图代表基本思想。有一些从 Kafka 读取的 KafkaSource-Processors。

并且一条边连接到一个顶点以解码 kafka 消息等等。

我可以使用 kafka 消息键作为分区键,但我认为我最终会得到不平衡的解码处理器。

鉴于:

  • 如何创建新的 Partitioner ?我找不到任何可以启发我的例子。
  • 在新的分区器上,如何识别发出消息的 KS 处理器?我想在前一个顶点进程和下一个顶点处理器之间建立一对一的关系,例如,KS#0 总是将消息发送到 Decode#0,KS#1 到 Decode#1 等等。
  • 我需要一个新的分区器还是有一些开箱即用的功能来实现它?
4

1 回答 1

2

您不需要为此使用分区程序。为此设计了Edge.isolated()相同的本地并行性:

dag.edge(between(kafkaSource, decode).isolated());

在这种情况下,源处理器的一个实例与目标处理器的一个实例绑定,并且项目的顺序将被保留。请记住,单个 Kafka 源处理器可以从多个 Kafka 分区中获取项目,因此您必须跟踪 Kafka 分区 ID。即使让 Jet 处理器和 Kafka 分区的总数相等,也不能依赖它,因为如果其中一个成员失败并重新启动作业,Jet 处理器的总数会减少,但 Kafka 分区的数量会获胜不。

另请注意,源的默认本地并行度不相等:对于 Kafka 源,它默认为 2,对于其他源,它通常等于 CPU 计数。您需要手动指定相等的值。

另一个限制是,如果您使用Processors.mapP顶点decode,映射函数必须是无状态的。因为您需要订购物品,所以我假设您有一些状态要保留。要使其正常工作,您必须使用自定义处理器:

Vertex decode = dag.newVertex("decode", MyDecodeP::new);

处理器实现:

private static class MyDecodeP extends AbstractProcessor {
    private Object myStateObject;

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        Object mappedItem = ...;
        return tryEmit(mappedItem);
    }
}

答案是为 Jet 0.5.1 编写的。

于 2018-01-29T09:49:52.493 回答