0

我们使用 Kafka Streams 库进行流处理。但是我们有太多用于连接 kafka 代理的 tcpv6/pipe 开放描述符的问题。

我们使用的 Kafka Stream 代码与上面的代码类似,不同之处在于我们有 10 多个用于 StreamsBuilder 的 KStream。

Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig config = new StreamsConfig(props);

StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

如果有人能就这个问题提供任何帮助,我们将不胜感激。谢谢!

4

0 回答 0