KafkaProducer.metrics()
我对 kafka 生产者 ( )的一些指标有一些疑问。
我的场景:我有一个启动 4 个独立生产者的驱动程序类。每个生产者都在自己的线程上运行。我通过该方法自动收集指标。.metrics()
实验运行 10 秒,即每个线程在 10 秒后被取消,主线程再等待几秒以检查指标是否因忙等待而发生变化(见进一步说明)。
观察:我发现发送的记录数 ( record-send-total
) 和record-send-rate
. 这record-send-total
表明在 10 秒的实验中发送了 21 条记录(2 条记录/秒)。另一方面,record-send-rate
是0.52
并且永远不会超过这个数字,如果乘以实验的持续时间(10 秒),总共会产生 5-6 条记录。
我心想,也许生产者的指标是在生产者关闭之前收集的。事实上,将主线程忙等待的持续时间增加到 5 和 10,分别减少了record-send-rate
to0.46
和10 0.42
。因此,我认为在取消 s 后立即关闭生产者Future
会改变结果。令我惊讶的是,它没有。
所以这是我的问题:
- 如此低的原因是什么
record-send-rate
? - 为什么
record-send-rate
* 持续时间和record-send-total
? - 为什么主线程中忙等待的持续时间会影响
record-send-rate
? - 为什么生产者的终止对指标没有影响(但忙等待有)?
- 为什么关闭生产者会限制可用指标的数量?
这是ProducerDriver
课程:
@Component
public class ProducerDriver {
private final List<ProducerThread> producers = new ArrayList<>();
private final List<String> topics = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4);
private List<Future<?>> futures = new ArrayList<>();
@PostConstruct
public void init() {
start();
}
private void start() {
// create producers
topics.forEach(topic -> producers.add(
new ProducerThread(new KafkaProducer<>(getProps()), topic)
));
// submit producers at scheduled rate
futures = producers.stream()
.map(producer -> executor.scheduleAtFixedRate(
producer,
0,
500,
TimeUnit.MILLISECONDS
)).collect(Collectors.toList());
// cancel the experiment after 10s
executor.schedule(
() -> futures.forEach(f -> f.cancel(true)),
10, TimeUnit.SECONDS);
// wait till producers are stopped before performing a busy wait of the main thread
boolean notCancelled = true;
while (notCancelled) {
for(Future<?> future : futures){
notCancelled &= !future.isCancelled();
}
}
// This line can be commented out in order to analyse the influence of the termination
// of producers on the metrics
producers.forEach(ProducerThread::shutdown);
// busy wait
try {
// Adjust this value to observe the differences in 'record-send-rate'
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
List<Map<String, Double>> producersMetrics = new ArrayList<>();
producers.forEach(p -> producersMetrics.add(p.getRelevantProducerMetrics()));
// the next two lines are just for setting a breakpoint
int i = 2;
i += 1;
}
private Properties getProps() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
这是ProducerThread
public class ProducerThread implements Runnable {
private int failedCount = 0;
private int messageCounter = 0;
private List<Long> latencies = new ArrayList<>();
private KafkaProducer<String, String> producer;
private String topic;
private StringGenerator stringGenerator;
public ProducerThread(KafkaProducer producer, String topic) {
this.producer = producer;
this.topic = topic;
this.stringGenerator = new StringGenerator(10);
}
@Override
public void run() {
send(stringGenerator.createPayloadRandomlyWithSize());
System.out.println("Current send rate of the producer on thread: " + Thread.currentThread().getId() +
" is " + getRelevantProducerMetrics().get("record-send-rate"));
}
public void send(String payload) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, payload);
final long sendTime = System.nanoTime();
producer.send(record, (recordMetadata, e) -> {
if (e == null) {
messageCounter++;
latencies.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sendTime));
} else {
failedCount++;
}
});
}
private Map<String, Double> filterProducerMetrics(Map<MetricName,? extends Metric> metrics) {
Map<String, Double> temp = new HashMap<>();
String key = "";
Double val = 0d;
for (Map.Entry<MetricName,? extends Metric> entry : metrics.entrySet()) {
key = entry.getKey().name();
if (RELEVANT_PRODUCER_METRICS.contains(key)&&
entry.getKey().group().equals(PRODUCER_METRICS)) {
val = (Double) entry.getValue().metricValue();
temp.put(key, temp.getOrDefault(key, 0d) + val);
}
}
return temp;
}
public Map<String, Double> getRelevantProducerMetrics() {
return filterProducerMetrics(this.producer.metrics());
}
public KafkaProducer<String, String> getProducer() {
return producer;
}
public static final String PRODUCER_METRICS = "producer-metrics";
public static List<String> RELEVANT_PRODUCER_METRICS = new ArrayList<>();
static {
RELEVANT_PRODUCER_METRICS.add("request-latency-avg");
RELEVANT_PRODUCER_METRICS.add("request-latency-max");
RELEVANT_PRODUCER_METRICS.add("request-total");
RELEVANT_PRODUCER_METRICS.add("request-rate");
RELEVANT_PRODUCER_METRICS.add("record-send-total");
RELEVANT_PRODUCER_METRICS.add("records-per-request-avg");
RELEVANT_PRODUCER_METRICS.add("record-send-rate");
}
public void shutdown() {
this.producer.close();
}
}