29

我是kafka的新手,如果我听起来很愚蠢,请道歉,但到目前为止我的理解是..消息流可以定义为一个主题,就像一个类别。并且每个主题都被划分为一个或多个分区(每个分区可以有多个副本)。所以他们并行行动

他们说从卡夫卡主站点

生产者可以选择将哪条消息分配给主题中的哪个分区。这可以简单地以循环方式完成以平衡负载,也可以根据某些语义分区函数(例如基于消息中的某个键)来完成。

这是否意味着在消费时我将能够从特定分区中选择消息偏移量?在运行多个分区时,是否可以从一个特定分区(即分区 0)中进行选择?

在 Kafka 0.7 quick start他们说

发送带有分区键的消息。具有相同键的消息被发送到同一个分区。

并且可以在创建生产者时提供密钥,如下所示

    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
    producer.send(data);

现在我如何根据这个键消费消息?在 Kafka 中生产时使用此密钥的实际影响是什么?

在 0.8beta 中创建生产者时,我们可以通过配置文件提供分区器类属性。可以创建自定义分区器类来实现 kafka 分区器接口。但我有点困惑它究竟是如何工作的。0.8 doc也没有过多解释。有什么建议或遗漏什么吗?

4

3 回答 3

19

Each topic in Kafka is split into many partitions. Partition allows for parallel consumption increasing throughput.

Producer publishes the message to a topic using the Kafka producer client library which balances the messages across the available partitions using a Partitioner. The broker to which the producer connects to takes care of sending the message to the broker which is the leader of that partition using the partition owner information in zookeeper. Consumers use Kafka’s High-level consumer library (which handles broker leader changes, managing offset info in zookeeper and figuring out partition owner info etc implicitly) to consume messages from partitions in streams; each stream may be mapped to a few partitions depending on how the consumer chooses to create the message streams.

For example, if there are 10 partitions for a topic and 3 consumer instances (C1,C2,C3 started in that order) all belonging to the same Consumer Group, we can have different consumption models that allow read parallelism as below

Each consumer uses a single stream. In this model, when C1 starts all 10 partitions of the topic are mapped to the same stream and C1 starts consuming from that stream. When C2 starts, Kafka rebalances the partitions between the two streams. So, each stream will be assigned to 5 partitions(depending on the rebalance algorithm it might also be 4 vs 6) and each consumer consumes from its stream. Similarly, when C3 starts, the partitions are again rebalanced between the 3 streams. Note that in this model, when consuming from a stream assigned to more than one partition, the order of messages will be jumbled between partitions.

Each consumer uses more than one stream (say C1 uses 3, C2 uses 3 and C3 uses 4). In this model, when C1 starts, all the 10 partitions are assigned to the 3 streams and C1 can consume from the 3 streams concurrently using multiple threads. When C2 starts, the partitions are rebalanced between the 6 streams and similarly when C3 starts, the partitions are rebalanced between the 10 streams. Each consumer can consume concurrently from multiple streams. Note that the number of streams and partitions here are equal. In case the number of streams exceed the partitions, some streams will not get any messages as they will not be assigned any partitions.

于 2014-12-15T16:15:19.110 回答
18

这是我到目前为止发现的。

通过实现 kafka Partitioner 接口来定义我们自己的自定义分区器类。实现的方法将有两个参数,第一个是我们从生产者提供的键,然后是可用分区的数量。所以我们可以定义自己的逻辑来设置消息的哪个键去哪个分区。

现在,在创建生产者时,我们可以使用“partitioner.class”属性指定我们自己的分区器类

    props.put("partitioner.class", "path.to.custom.partitioner.class");

如果我们不提及它,那么 Kafka 将使用它的默认类并尝试在可用分区之间平均分配消息。

还通知Kafka如何序列化密钥

    props.put("key.serializer.class", "kafka.serializer.StringEncoder");

现在,如果我们使用生产者中的键发送一些消息,消息将被传递到特定分区(基于我们在自定义分区器类上编写的逻辑),并且在消费者(SimpleConsumer)级别我们可以指定分区来检索具体消息。

如果我们需要将字符串作为键传递,则应在自定义分区器类中进行相同处理(获取键的哈希值,然后获取前两位数等)

于 2013-08-14T15:22:56.153 回答
6

这是否意味着在消费时我将能够从特定分区中选择消息偏移量?在运行多个分区时,是否可以从一个特定分区(即分区 0)中进行选择?

是的,您可以从消费者的一个特定分区中选择消息,但如果您希望动态识别该消息,那么这取决于您在生产者中实现分区器类的逻辑。

现在我如何根据这个键消费消息?在 Kafka 中生产时使用此密钥的实际影响是什么?

有两种消费消息的方式。一个是使用 Zookeeper 主机,另一个是静态主机。Zookeper 主机使用来自所有分区的消息。但是,如果您使用的是静态主机,则可以为代理提供需要使用的分区号。

请查看以下 Kafka 0.8 示例

制片人

KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);

分区类

   public int partition(Object arg0, int arg1) {
        // arg0 is the key given while producing, arg1 is the number of
        // partition the broker has
        long organizationId = Long.parseLong((String) arg0);
        // if the given key is less than the no of partition available then send
        // it according to the key given Else send it to the last partition
        if (arg1 < organizationId) {

            return (arg1 - 1);
        }
        // return (int) (organizationId % arg1);
        return Integer.parseInt((String) arg0);
    }

因此,partiotioner 类根据您的逻辑决定将消息发送到何处。

消费者(PN:我使用了 Storm Kafka 0.8 集成)

        HostPort hosts = new HostPort("10.**.**.***",9092);

        GlobalPartitionInformation gpi = new GlobalPartitionInformation();
        gpi.addPartition(0, hosts);
        gpi.addPartition(2, hosts);

        StaticHosts statHost = new StaticHosts(gpi);

        SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);
于 2013-08-28T07:34:08.877 回答