0

我有一个简单的 spring boot + kafka + apm app。我使用 ELK+APM 服务器7.12.1版本和 java 代理co.elastic.apm:apm-agent-attach:1.25.0

我的 Elastic apm 配置如下所示:

@Setter
@Configuration
@ConfigurationProperties(prefix = "app.elastic-apm")
public class ElasticApmConfiguration {

    private static final String SERVER_URL_KEY = "server_url";
    private String serverUrl;

    private static final String SERVICE_NAME_KEY = "service_name";
    private String serviceName;

    private static final String ENVIRONMENT_KEY = "environment";
    private String environment;

    private static final String APPLICATION_PACKAGES_KEY = "application_packages";
    private String applicationPackages;

    @PostConstruct
    public void init() {
        Map<String, String> apmProps = new HashMap<>();
        apmProps.put(SERVER_URL_KEY, serverUrl);
        apmProps.put(SERVICE_NAME_KEY, serviceName);
        apmProps.put(ENVIRONMENT_KEY, environment);
        apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);

        ElasticApmAttacher.attach(apmProps);
    }
}

我有一个像这样的@KafkaListener:

@Slf4j
@EnableKafka
@Component
@RequiredArgsConstructor
public class CampaignCleanedListener {

    ...deps...

    @KafkaListener(topics = "${app.kafka.topics.mytopic}")
    public void consume(@Payload String input, Acknowledgment acknowledgment) {
       ....my logic....
        acknowledgment.acknowledge();
    }
}

我在我的 kibana 中的 APM 图表中看到,处理消息的平均时间约为 500 毫秒,一切运行顺利。但是,当我像这样添加我的自定义时ConcurrentKafkaListenerContainerFactory

@Configuration
public class MyConfiguration {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        KafkaAutoConfiguration kafkaAutoConfiguration,
        ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, (ConsumerFactory<Object, Object>) kafkaAutoConfiguration.kafkaConsumerFactory(customizers));

        factory.setBatchListener(true);
        factory.setBatchErrorHandler(new RetryingBatchErrorHandler(new ExponentialBackOff(), null));

        return factory;
    }
}

突然它停止正常工作并显示一些错误值,例如 3 μs 的消息。我什至不确定它测量的是什么。我想问一下它是否有一些错误或者它是否按预期工作,如果是这样,如何使 apm 与自定义一起工作ConcurrentKafkaListenerContainerFactory

有谁知道是什么原因以及如何解决?

4

0 回答 0