3

我一直在尝试进行一些搜索(Google、Slack、Stack),但还没有找到答案。我们有一些使用Spring Cloud Streams编写的应用程序,并且有兴趣将后端从 Kafka 交换到 Pulsar。Spring 目前没有对 Kafka 的任何原生支持,但是 pulsar 似乎提供了使用 Kafka API 直接与 pulsar 通信的能力(https://pulsar.apache.org/docs/en/adaptors-kafka)。

我想知道是否有人已经尝试Kafka-clients在 Spring 云消息传递的上下文中使用这个替代库来替代库。

当然,另一种有效的方法是重新编写代码——但我想求助于社区,看看是否有人已经走上了这条路。

谢谢

4

1 回答 1

1

虽然不完全符合您的要求,但我尝试使用阴影库 org.apache.pulsar:pulsar-client-kafka:2.5.0 将 Pulsar 与 Spring-Kafka 集成,并且它与此堆栈跟踪有关:

Caused by: java.lang.UnsupportedOperationException: null
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:650) ~[pulsar-client-kafka-2.5.0.jar:2.5.0]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:312) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:292) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:311) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:255) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.10.RELEASE.jar:5.1.10.RELEASE]

可以使用 AbstractMessageListenerContainer.checkTopics 的自定义实现绕过此特定错误(对于这些特定库版本),但您可能会遇到更多问题。

于 2020-01-27T14:11:11.747 回答