0

我在 maven 项目中有一个 java 文件,其中包含用于将数据发送到主题的 Kafka 生产者代码。

Producer.java
// some code

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

JSONObject jsonRecord = new JSONObject();
jsonRecord.put("integer", 1);
try {
    RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topicName, null,
                            mapper.writeValueAsString(jsonRecord))).get();
    System.out.println("metadata - topic: " + recordMetadata.topic());
} 
catch (Exception e) {
        System.out.println("error occurred" + e.getMessage());
}

// some code

我使用命令运行这个文件

mvn exec:java -Dexec.mainClass=Producer

属性是:

public static final String JAAS_SASL_TEMPLATE =
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" %s=\"%s\";"
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
String jaasCfg = String.format(JAAS_SASL_TEMPLATE, "username", "auth_secret_key", "kafka_secret");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "example.aws.confluent.cloud:9092");       
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

当我在本地运行此代码时,它能够将数据发送到融合的 Kafka 主题

现在我在 Jenkins 管道中添加了一个阶段来在 shell 脚本中运行 java 文件

stage('Send data') {
            steps {
                sh '''
                  mvn exec:java -Dexec.mainClass=Producer
                  '''
                }
            }
        

所有其他功能都有效,但 Kafka 生产者无法将数据发送到融合的 Kafka 主题。也不会抛出任何错误。有人可以帮我解决我所缺少的吗?

4

0 回答 0