我有一个简单的 spring boot + kafka + apm app。我使用 ELK+APM 服务器7.12.1
版本和 java 代理co.elastic.apm:apm-agent-attach:1.25.0
我的 Elastic apm 配置如下所示:
@Setter
@Configuration
@ConfigurationProperties(prefix = "app.elastic-apm")
public class ElasticApmConfiguration {
private static final String SERVER_URL_KEY = "server_url";
private String serverUrl;
private static final String SERVICE_NAME_KEY = "service_name";
private String serviceName;
private static final String ENVIRONMENT_KEY = "environment";
private String environment;
private static final String APPLICATION_PACKAGES_KEY = "application_packages";
private String applicationPackages;
@PostConstruct
public void init() {
Map<String, String> apmProps = new HashMap<>();
apmProps.put(SERVER_URL_KEY, serverUrl);
apmProps.put(SERVICE_NAME_KEY, serviceName);
apmProps.put(ENVIRONMENT_KEY, environment);
apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);
ElasticApmAttacher.attach(apmProps);
}
}
我有一个像这样的@KafkaListener:
@Slf4j
@EnableKafka
@Component
@RequiredArgsConstructor
public class CampaignCleanedListener {
...deps...
@KafkaListener(topics = "${app.kafka.topics.mytopic}")
public void consume(@Payload String input, Acknowledgment acknowledgment) {
....my logic....
acknowledgment.acknowledge();
}
}
我在我的 kibana 中的 APM 图表中看到,处理消息的平均时间约为 500 毫秒,一切运行顺利。但是,当我像这样添加我的自定义时ConcurrentKafkaListenerContainerFactory
:
@Configuration
public class MyConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaAutoConfiguration kafkaAutoConfiguration,
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, (ConsumerFactory<Object, Object>) kafkaAutoConfiguration.kafkaConsumerFactory(customizers));
factory.setBatchListener(true);
factory.setBatchErrorHandler(new RetryingBatchErrorHandler(new ExponentialBackOff(), null));
return factory;
}
}
突然它停止正常工作并显示一些错误值,例如 3 μs 的消息。我什至不确定它测量的是什么。我想问一下它是否有一些错误或者它是否按预期工作,如果是这样,如何使 apm 与自定义一起工作ConcurrentKafkaListenerContainerFactory
有谁知道是什么原因以及如何解决?