我在 maven 项目中有一个 java 文件,其中包含用于将数据发送到主题的 Kafka 生产者代码。
Producer.java
// some code
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
JSONObject jsonRecord = new JSONObject();
jsonRecord.put("integer", 1);
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topicName, null,
mapper.writeValueAsString(jsonRecord))).get();
System.out.println("metadata - topic: " + recordMetadata.topic());
}
catch (Exception e) {
System.out.println("error occurred" + e.getMessage());
}
// some code
我使用命令运行这个文件
mvn exec:java -Dexec.mainClass=Producer
属性是:
public static final String JAAS_SASL_TEMPLATE =
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" %s=\"%s\";"
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
String jaasCfg = String.format(JAAS_SASL_TEMPLATE, "username", "auth_secret_key", "kafka_secret");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "example.aws.confluent.cloud:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
当我在本地运行此代码时,它能够将数据发送到融合的 Kafka 主题
现在我在 Jenkins 管道中添加了一个阶段来在 shell 脚本中运行 java 文件
stage('Send data') {
steps {
sh '''
mvn exec:java -Dexec.mainClass=Producer
'''
}
}
所有其他功能都有效,但 Kafka 生产者无法将数据发送到融合的 Kafka 主题。也不会抛出任何错误。有人可以帮我解决我所缺少的吗?