0

KafkaProducer.metrics()我对 kafka 生产者 ( )的一些指标有一些疑问。

我的场景:我有一个启动 4 个独立生产者的驱动程序类。每个生产者都在自己的线程上运行。我通过该方法自动收集指标。.metrics()实验运行 10 秒,即每个线程在 10 秒后被取消,主线程再等待几秒以检查指标是否因忙等待而发生变化(见进一步说明)。

观察:我发现发送的记录数 ( record-send-total) 和record-send-rate. 这record-send-total表明在 10 秒的实验中发送了 21 条记录(2 条记录/秒)。另一方面,record-send-rate0.52并且永远不会超过这个数字,如果乘以实验的持续时间(10 秒),总共会产生 5-6 条记录。

我心想,也许生产者的指标是在生产者关闭之前收集的。事实上,将主线程忙等待的持续时间增加到 5 和 10,分别减少了record-send-rateto0.46和10 0.42。因此,我认为在取消 s 后立即关闭生产者Future会改变结果。令我惊讶的是,它没有。

所以这是我的问题:

  1. 如此低的原因是什么record-send-rate
  2. 为什么record-send-rate* 持续时间和record-send-total?
  3. 为什么主线程中忙等待的持续时间会影响record-send-rate
  4. 为什么生产者的终止对指标没有影响(但忙等待有)?
  5. 为什么关闭生产者会限制可用指标的数量?

这是ProducerDriver课程:

@Component
public class ProducerDriver {

    private final List<ProducerThread> producers = new ArrayList<>();
    private final List<String> topics = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4);
    private List<Future<?>> futures = new ArrayList<>();

    @PostConstruct
    public void init() {
        start();
    }

    private void start() {

        
        // create producers
        topics.forEach(topic -> producers.add(
                new ProducerThread(new KafkaProducer<>(getProps()), topic)
        ));
        
        // submit producers at scheduled rate
        futures = producers.stream()
                .map(producer -> executor.scheduleAtFixedRate(
                        producer,
                        0,
                        500,
                        TimeUnit.MILLISECONDS
                )).collect(Collectors.toList());

        // cancel the experiment after 10s
        executor.schedule(
                () -> futures.forEach(f -> f.cancel(true)),
                10, TimeUnit.SECONDS);


        // wait till producers are stopped before performing a busy wait of the main thread
        boolean notCancelled = true;
        while (notCancelled) {
            for(Future<?> future : futures){
                notCancelled &= !future.isCancelled();
            }
        }
 
        // This line can be commented out in order to analyse the influence of the termination
        // of producers on the metrics
        producers.forEach(ProducerThread::shutdown);
        // busy wait
        try {
            // Adjust this value to observe the differences in 'record-send-rate'
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {

        }

        List<Map<String, Double>> producersMetrics = new ArrayList<>();
        producers.forEach(p -> producersMetrics.add(p.getRelevantProducerMetrics()));
        // the next two lines are just for setting a breakpoint
        int i = 2;
        i += 1;

    }

    private Properties getProps() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }
}

这是ProducerThread

public class ProducerThread implements Runnable {

    private int failedCount = 0;
    private int messageCounter = 0;
    private List<Long> latencies = new ArrayList<>();

    private KafkaProducer<String, String> producer;
    private String topic;
    private StringGenerator stringGenerator;

    public ProducerThread(KafkaProducer producer, String topic) {
        this.producer = producer;
        this.topic = topic;
        this.stringGenerator = new StringGenerator(10);
    }

    @Override
    public void run() {
        send(stringGenerator.createPayloadRandomlyWithSize());
        System.out.println("Current send rate of the producer on thread: " + Thread.currentThread().getId() +
                " is " + getRelevantProducerMetrics().get("record-send-rate"));
    
    }

    public void send(String payload) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, payload);
        final long sendTime = System.nanoTime();
        producer.send(record, (recordMetadata, e) -> {
            if (e == null) {
                messageCounter++;
                latencies.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sendTime));
            } else {
                failedCount++;
            }
        });
    }

    private Map<String, Double> filterProducerMetrics(Map<MetricName,? extends Metric> metrics) {
        Map<String, Double> temp = new HashMap<>();
        String key = "";
        Double val = 0d;
        for (Map.Entry<MetricName,? extends Metric> entry : metrics.entrySet()) {
            key = entry.getKey().name();
            if (RELEVANT_PRODUCER_METRICS.contains(key)&&
                    entry.getKey().group().equals(PRODUCER_METRICS)) {
                val = (Double) entry.getValue().metricValue();
                temp.put(key, temp.getOrDefault(key, 0d) + val);
            }
        }
        return temp;

    }

    public Map<String, Double> getRelevantProducerMetrics() {
        return filterProducerMetrics(this.producer.metrics());
    }

    public KafkaProducer<String, String> getProducer() {
        return producer;
    }

    public static final String PRODUCER_METRICS = "producer-metrics";
    public static List<String> RELEVANT_PRODUCER_METRICS = new ArrayList<>();
    static {
        RELEVANT_PRODUCER_METRICS.add("request-latency-avg");
        RELEVANT_PRODUCER_METRICS.add("request-latency-max");
        RELEVANT_PRODUCER_METRICS.add("request-total");
        RELEVANT_PRODUCER_METRICS.add("request-rate");
        RELEVANT_PRODUCER_METRICS.add("record-send-total");
        RELEVANT_PRODUCER_METRICS.add("records-per-request-avg");
        RELEVANT_PRODUCER_METRICS.add("record-send-rate");
    }

    public void shutdown() {
        this.producer.close();
    }
}


4

0 回答 0