0

我正在尝试添加一个拦截器来验证生产者发布到 Kafka 主题的消息。除了 Kafka 主题执行的模式验证之外,我还需要做一些验证。我遵循的步骤如下。

  1. 我编写了一个扩展 ProducerInterceptor 接口的 Java 类。
  2. 编译类并创建一个 jar 文件,该文件放置在类路径中包含的文件夹中。
  3. 在 Kafka 安装中添加 intercetors.classes= classname 到 producer.properties。

但是当我向主题发布消息时,我编写的自定义拦截器类不会被调用。(我也没有收到任何错误。消息完美地发布到主题)。

我已经提到了https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

请就此提出建议。

4

2 回答 2

1

这个问题已经很老了,所以我假设您同时找到了解决方案。但是,以防万一它对其他人有所帮助,我发现ProducerInterceptor除非我的流已经具有指定的输出,否则不会调用我的类,该类根据消息的内容将消息分派到不同的主题。

我的第一次尝试看起来像这样,因为我认为我不需要指定输出主题。这不起作用:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

但这确实:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

值得注意的是,dummy-output-topic在第二个示例中没有发布任何内容,并且 usingto而不是through似乎也以相同的方式工作。

在我的例子中,我map在使用拦截器将它们分派到不同的主题之前调用更改记录,所以我的代码实际上看起来更像这样:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
    .map(new CustomKeyValueMapper)
    .through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

我希望这些例子能帮助任何与ProducerInterceptors 一起犯过我犯过的错误的人。

于 2017-02-16T11:24:55.193 回答
0

属性名称是interceptor.classes,而不是intercetors.classes

于 2016-09-10T14:10:29.653 回答