2

以前我一直在使用 0.8 API。当您将主题列表传递给它时,它会返回一个流映射(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。

//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  
consumer.createMessageStreams(topicCountMap);

我想升级到 0.10。我检查KafkaStreamsKafkaConsumer上课。KafkaConsumerobject 接受配置属性并提供接受主题列表的订阅方法,其返回类型为 void。我找不到可以处理每个主题的方法。

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(topicsList);
conusmer.poll(long ms)

KafkaStreams另一方面似乎有同样的问题。

KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

source.foreach()可用的方法,但它是所有主题的流。任何人,任何想法?

4

1 回答 1

2

首先,使用多线程使用者很棘手,因此您使用的模式0.8希望设计得很好:)

最佳实践是使用单线程使用者,因此,如果单个使用者一次订阅主题列表,则“无需”分隔不同的主题。然而,在使用记录时,记录对象会提供有关它来自哪个主题的信息(它携带此元数据)。因此,理论上您可以根据其主题将记录分派到不同的线程以进行实际处理(即使不推荐这样做!)。

Kafka 通过分区横向扩展,因此,如果单线程消费者无法处理负载,您应该启动多个消费者(作为消费者组)来横向扩展您的消费者处理能力。

一个更普遍的问题:如果你想处理每个主题的数据,为什么不使用多个消费者,每个消费者都订阅一个主题?

最后但同样重要的是,在 Apache Kafka0.10+中,Kafka Streams API 是一个新引入的流处理库——尽管它不能与 0.8KafkaStream类混淆(提示,没有“s”)。两者完全不相关。

于 2016-10-01T06:27:14.373 回答