我正在使用 pykafka 来消费消息,现在我正在使用 balance_consumer 来消费来自一个主题的消息。现在我必须消费来自另一个主题的消息,如果可以优先消费来自不同主题的消息。我该如何处理这个问题?可能是python的其他库?
问问题
835 次
1 回答
0
我刚刚发布了一篇关于这个问题的帖子。
即使我使用的是 Java,您也可以发现那里描述的概念对您的情况很有用。
我们解决 Kafka 主题优先级问题的方法是 -
我们开发了一种机制来优先考虑 Kafka 主题的消费。这样的机制将检查我们是否要处理从 Kafka 消费的消息,或者将处理留待以后处理。
我们在分区和布尔值之间进行映射,如果需要,它会阻止每个分区的消耗,topicPartitionLocks。阻止初步的,同时继续从迟到的消费,创建主题的优先级。TimerTask 更新此映射,我们的消费者检查他们是否被“允许”消费或必须等待——正如您在方法 waitForLatePartitionIfNeeded 中看到的那样。
public class Prioritizer extends TimerTask {
private Map<String, Boolean> topicPartitionLocks = new ConcurrentHashMap<>();
private Map<String, Long> topicPartitionLatestTimestamps = new ConcurrentHashMap<>();
@Override
public void run(){
updateTopicPartitionLocks();
}
private void updateTopicPartitionLocks() {
Optional<Long> minValue = topicPartitionLatestTimestamps.values().stream().min((o1, o2) -> (int) (o1 - o2));
if(! minValue.isPresent()) {
return;
}
Iterator it = topicPartitionLatestTimestamps.entrySet().iterator();
while (it.hasNext()) {
Boolean shouldLock = false;
Map.Entry<String, Long> pair = (Map.Entry)it.next();
String topicPartition = pair.getKey();
if(pair.getValue() > (minValue.get() + maxGap)) {
shouldLock = true;
if(isSameTopicAsMinPartition(minValue.get(), topicPartition)) {
shouldLock = false;
}
}
topicPartitionLocks.put(topicPartition, shouldLock);
}
}
public boolean isLocked(String topicPartition) {
return topicPartitionLocks.get(topicPartition).booleanValue();
}
}
waitForLatePartitionIfNeeded 方法
private void waitForLatePartitionIfNeeded(final String topic, int partition) {
String topicPartition = topic + partition;
prioritizer.getTopicPartitionLocks.putIfAbsent(topicPartition);
while(prioritizer.isLocked(topicPartition)) {
monitorWaitForLatePartitionTimes(topicPartition, startTime);
Misc.sleep(timeToWaitBetweenGapToTardyPartitionChecks.get());
}
}
使用这个我们导致了重新平衡的增加,所以我们用这个定义来解决它:
我们改变了Kafka中的下一个配置
request.timeout.ms: 7300000 (~2hrs)
max.poll.interval.ms: 7200000 (2hrs)
有关该问题的图表和一般描述,您可以查看我的帖子:
我如何通过优先考虑 Kafka 主题来解决 Kafka 消息的延迟
祝你好运!
于 2018-10-22T17:54:21.357 回答