6

在 Kafka Streams 中,产生/写入流的规范方式是什么?在 Spark 中,有一个自定义接收器,它作为来自任意数据源的长时间运行的适配器。Kafka Streams 中的等价物是什么?

具体来说,我不是在问如何从一个主题转换到另一个主题。文档对此非常清楚。我想了解如何编写我的工人,这些工人将在一系列转换到 Kafka 中进行第一次写入。

我希望能够做到

builder1.<something>(<some intake worker like a spark reciver)
       .to(topic1)
       .start()

builder2.from(topic1)
        .transform(<some transformation function>)
        .to(topic2)
        .start()

但是现有的文档都没有显示这一点?我错过了什么吗?

4

3 回答 3

10

取决于您使用的是 Kafka Streams DSL 还是 Processor API:

  • Kafka Streams DSL 您可以KStream#to()用来实现KStream主题。这是将数据具体化为主题的规范方法。或者,您可以使用KStream#through(). 这也会将数据具体化为主题,但也会返回结果KStream以供进一步使用。#to()和之间的唯一区别#through()是,KStreamBuilder#stream()如果您希望将生成的物化分区作为KStream.

  • 处理器 API通过将数据转发到接收器处理器,您可以将数据具体化到分区。

无论哪种方式,需要注意的重要一点是,在您使用上述方法之一写入分区之前,数据不会具体化到主题。map(), filter(), 等不实现数据。数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现。


要制作成 Kafka Streams:

Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());

接着:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))

你会需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>
于 2016-06-30T20:04:36.440 回答
5

我想了解如何编写我的工人,这些工人将在一系列转换为 kafka 的过程中进行第一次写入。

初始写入(= 输入数据)不应通过 Kafka Streams 完成。Kafka Streams 假设输入数据已经在 Kafka 中。

因此,您的预期工作流程不适用:

builder1.<something>(<some intake worker like a spark reciver)
   .to(topic1)
   .start()

相反,您会使用 Kafka Connect 之类的东西将数据导入 Kafka(例如,从数据库到 Kafka 主题)或使用“普通”的 Kafka 生产者客户端(Java、C/C++、Python...)来编写将数据输入到 Kafka。

Kafka Streams 中还没有可用的“钩子”来引导输入数据。我们正在寻找 Kafka Connect 和 Kafka Streams 的更好集成,因此这种情况在不久的将来可能会得到改善。

于 2016-07-01T15:03:04.740 回答
-1

对于 Linux,您可以尝试以下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicName --property "parse.key=true"property "key.separator=:";
  1. parse.key 设置为 true 时允许从控制台接受输入作为键和值对。

  2. key.separator 将设置为键值对的分隔符。

于 2017-10-30T18:28:15.663 回答