0

我是一个新手,试图使用 Confluent Cloud Apache Kafka 在两个 Spring Boot 微服务之间进行通信。

在 Confluent Cloud 上使用 Kafka 时,在 ServiceA 将消息发布到主题后,我的消费者(ServiceB)上出现以下错误。但是,当我登录到我的 Confluent Cloud 时,我看到消息已成功发布到主题。

 org.springframework.context.ApplicationContextException: Failed to start bean 
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is 
 java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and 
 missingTopicsFatal is true 

当我在本地服务器上运行 Kafka 时,我不会遇到这个问题。ServiceA 能够将消息发布到我本地 Kafka 服务器上的主题,并且 ServiceB 能够成功地使用该消息。

我在 application.properties 中提到了我的本地 Kafka 服务器配置(作为注释掉的代码)

服务 A:生产者

应用程序属性

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

发件人.java

public class Sender {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${app.topic}")
private String topic;

public void send(String data){
    Message<String> message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC, topic)
            .build();
    kafkaTemplate.send(message);
  }
}

KafkaProducerConfig.java

@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
 }

}

服务 B:消费者

应用程序属性

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

KafkaConsumerConfig.java

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
  @Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory(
            consumerConfigs(),
            new StringDeserializer(), new StringDeserializer());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
 }
}

KafkaConsumer.java

@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);

@Value("{app.topic}")
private String kafkaTopic;

  @KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
  public void receive(@Payload String data) {
    LOG.info("received data='{}'", data);
  }
}
4

3 回答 3

0

@cricket_007 的回答是正确的。您需要在sasl.jaas.config属性值中嵌入用户名和密码(特别是集群 API 密钥和 API 机密)。

您可以通过此处的官方示例仔细检查 Java 客户端应如何连接到 Confluent Cloud:https ://github.com/confluentinc/examples/blob/5.3.1-post/clients/cloud/java/src/main/java /io/confluent/examples/clients/云

谢谢,

——里卡多

于 2019-12-05T15:04:11.013 回答
0

用户名和密码是 JAAS 配置的一部分,所以将它们放在一行中

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaclient1" password="kafkaclient1-secret";

我还建议您验证您的属性文件是否正确加载到客户端

于 2019-12-05T13:55:06.827 回答
0

请参阅引导文档

您不能直接将任意 kafka 属性放在 application.properties 文件中。

自动配置支持的属性见 appendix-application-properties.html。请注意,在大多数情况下,这些属性(连字符或驼峰式)直接映射到 Apache Kafka 点属性。有关详细信息,请参阅 Apache Kafka 文档。

这些属性中的前几个适用于所有组件(生产者、消费者、管理员和流),但如果您希望使用不同的值,可以在组件级别指定。Apache Kafka 指定具有 HIGH、MEDIUM 或 LOW 重要性的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的一个子集可以直接通过 KafkaProperties 类获得。如果您希望使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

这将通用 prop.one Kafka 属性设置为第一个(适用于生产者、消费者和管理员),prop.two 管理属性设置为第二,prop.three 消费者属性设置为第三,prop.four 生产者属性设置为第四,prop .five 流属性到第五。

...

于 2019-12-05T14:03:38.883 回答