3

我正在寻找一种如何在两个 Kafka 主题之间分发消息的方法。在原始主题中,我有20 个分区,每个分区有1000000条消息。我想要一个包含1000 个分区的新主题,并将消息传播到新的更广泛的分区范围。

1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)

是否可以在 Kafka 中做到这一点(通过主题镜像或其他技术)?

4

1 回答 1

1

您可以使用Kafka 附带的MirrorMaker (版本 1)。该工具主要用于将数据从一个数据中心复制到另一个数据中心。它建立在主题名称在两个集群中保持相同的假设之上。

但是,您可以提供MessageHandler重命名主题的自定义。

package org.xxx.java;

import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * An example implementation of MirrorMakerMessageHandler that allows to rename topic.
 */
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
  private final String newName;

  public TopicRenameHandler(String newName) {
    this.newName = newName;
  }

  public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
    return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
  }
}

我在我的pom.xml文件中使用了以下依赖项

    <properties>
        <kafka.version>2.5.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

编译上面的代码并确保将你的类添加到CLASSPATH

export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar

现在,连同一些基本的consumer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest

producer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-producer

你可以调用kafka-mirror-maker如下

kafka-mirror-maker --consumer.config /path/to/consumer.properties \
 --producer.config /path/to/producer.properties \
 --num.streams 1 \
 --whitelist="topicToBeRenamed" \
 --message.handler org.xxx.java.TopicRenameHandler \
 --message.handler.args "newTopicName"

请注意此方法的以下两个注意事项:

  • 当您计划更改分区数时,与旧主题相比,新主题中消息的顺序可能会有所不同。默认情况下,消息按 Kafka 中的键进行分区。
  • 使用 MirrorMaker 不会复制旧主题中的现有偏移量,而是开始写入新的偏移量。因此,旧主题的偏移量与新主题的偏移量之间(几乎)没有关系。
于 2020-05-04T20:23:21.347 回答