1

我前段时间使用过 Hazelcast,而且我是第一次使用 Hazelcast Jet,对于处理一些实时流式传输,进行探索似乎很有趣。

在这里,我有一种情况,我正在Kafka topic使用IMap

private static Pipeline buildPipelineForClientDataa() {
        Pipeline p = Pipeline.create();
        p.drawFrom(KafkaSources.kafka(
                props("bootstrap.servers", BOOTSTRAP_SERVERS, 
                        "key.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "value.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "auto.offset.reset", AUTO_OFFSET_RESET), 
                KAFKA_TOPIC))
        .withoutTimestamps()
        .drainTo(Sinks.map(SINK_CLINET_DATA));
        return p;
    }

好吧,我没有这个话题的关键。我应该可以选择将滚动号码分配为密钥吗?如果是这样,请帮助我使用该技术。谢谢。

4

1 回答 1

0

由于 Jet 是一个分布式系统,因此使用递增数字并不适合。它适用于分区流,每个流分区应该是独立的。您需要通过非并行处理器路由所有项目。

您可以使用UUID或 HazelcastFlakeIdGenerator作为键,但如果作业重新启动并从快照偏移量重新处理 Kafka 主题,相同的项目将分配不同的键,并将在目标映射中出现两次。

如果想拥有map中的每一项,可以使用Kafka的topic+partitionId+offset组合作为key:

p.drawFrom(KafkaSources.kafka(
    props(...),
    record -> Util.entry(
        Tuple3.tuple3(record.topic(), record.partition(), record.offset()),
        record.value()),
    KAFKA_TOPIC))

如果您只有一个主题,则可以省略该主题。

于 2019-05-02T15:42:38.630 回答