我正在使用 Spring Kafka 2.3.9 编写一个 Kafka Producer,它假设向一个主题发布大约 200000 条消息。例如,我有一个从数据库中获取的 200000 个对象的列表,我想将这些对象的 json 消息发布到一个主题。
我写的生产者可以很好地发布,比如说,1000 条消息。然后它正在创建一些空指针错误(我已包含下面的屏幕截图)。
调试的时候发现Kafka Producer Network Thread的数量非常多。我数不过来,但肯定超过 500 个。
我已经阅读了Kafka Producer Thread 线程,即使没有发送消息也有大量线程,并通过在 DefaultKafkaProducerFactory 上设置 producerPerConsumerPartition 属性为 false 来进行类似的配置。但它仍然没有减少 Kafka 生产者网络线程数。
下面是我的代码片段、错误和这些线程的图片。我不能发布所有的代码段,因为它来自一个真实的项目。
代码段
public DefaultKafkaProducerFactory<String, String> getProducerFactory() throws IOException, IllegalStateException {
Map<String, Object> configProps = getProducerConfigMap();
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configProps);
//defaultKafkaProducerFactory.transactionCapable();
defaultKafkaProducerFactory.setProducerPerConsumerPartition(false);
defaultKafkaProducerFactory.setProducerPerThread(false);
return defaultKafkaProducerFactory;
}
public Map<String, Object> getProducerConfigMap() throws IOException, IllegalStateException {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getKafkaRetryConfig());
configProps.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getKafkaAcknowledgementConfig());
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getKafkaClientId());
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 512 * 1024 * 1024);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10 * 1000);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//updateSSLConfig(configProps);
return configProps;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
ProducerFactory<String, String> producerFactory = getProducerFactory();
KafkaTemplate<String, String> kt = new KafkaTemplate<String, String>(stringProducerFactory, true);
kt.setCloseTimeout(java.time.Duration.ofSeconds(5));
return kt;
}
错误
2020-12-07 18:14:19.249 INFO 26651 --- [onPool-worker-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=kafka-client-09f48ec8-7a69-4b76-a4f4-a418e96ff68e-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2020-12-07 18:14:19.254 ERROR 26651 --- [onPool-worker-1] c.w.p.r.g.xxxxxxxx.xxx.KafkaPublisher : Exception happened publishing to topic. Failed to construct kafka producer
2020-12-07 18:14:19.273 INFO 26651 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-12-07 18:14:19.281 ERROR 26651 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalStateException: Failed to execute CommandLineRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at xxx.xxx.xxx.Application.main(Application.java:46) [classes/:na]
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[na:1.8.0_144]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[na:1.8.0_144]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[na:1.8.0_144]
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[na:1.8.0_144]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_144]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_144]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_144]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_144]
Caused by: java.lang.NullPointerException: null
at com.xxx.xxx.xxx.xxx.KafkaPublisher.publishData(KafkaPublisher.java:124) ~[classes/:na]
at com.xxx.xxx.xxx.xxx.lambda$0(Publisher.java:39) ~[classes/:na]
at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_144]
at com.xxx.xxx.xxx.xxx.publishData(Publisher.java:38) ~[classes/:na]
at com.xxx.xxx.xxx.xxx.Application.lambda$0(Application.java:75) [classes/:na]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[na:1.8.0_144]
... 5 common frames omitted
以下是发布消息的代码 - 第 124 行是我们实际调用 KafkaTemplate 的时间
public void publishData(Object object) {
ListenableFuture<SendResult<String, String>> future = null;
// Convert the Object to JSON
String json = convertObjectToJson(object);
// Generate unique key for the message
String key = UUID.randomUUID().toString();
// Post the JSON to Kafka
try {
future = kafkaConfig.kafkaTemplate().send(kafkaProperties.getTopicName(), key, json);
} catch (Exception e) {
logger.error("Exception happened publishing to topic. {}", e.getMessage());
}
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Sent message with key=[" + key + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=[ {} due to {}", json, ex.getMessage());
}
});
kafkaConfig.kafkaTemplate().flush();
}
==============================
我不确定这个错误是否是由许多网络线程引起的。
发布数据后,我调用了 KafkaTemplate 刷新方法。那没起效。我还调用了ProducerFactory closeThreadBoundProducer、reset、destroy方法。他们似乎都没有工作。
我是否缺少任何配置?