以前我一直在使用 0.8 API。当您将主题列表传递给它时,它会返回一个流映射(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。
//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
我想升级到 0.10。我检查KafkaStreams
并KafkaConsumer
上课。KafkaConsumer
object 接受配置属性并提供接受主题列表的订阅方法,其返回类型为 void。我找不到可以处理每个主题的方法。
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(topicsList);
conusmer.poll(long ms)
KafkaStreams
另一方面似乎有同样的问题。
KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
有source.foreach()
可用的方法,但它是所有主题的流。任何人,任何想法?