146
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message)); 
producer.send(keyedMessage);

目前,我正在发送没有任何密钥的消息作为密钥消息的一部分,它仍然可以使用delete.retention.ms吗?我需要将密钥作为消息的一部分发送吗?将密钥作为消息的一部分这样好吗?

4

3 回答 3

247

如果您需要强大的密钥顺序并且正在开发诸如状态机之类的东西,则密钥通常是有用的/必要的。如果您要求始终以正确的顺序查看具有相同键(例如,唯一 id)的消息,则将键附加到消息将确保具有相同键的消息始终进入主题中的同一分区。Kafka 保证分区内的顺序,但不保证主题中的分区之间的顺序,因此,或者不提供密钥(这将导致跨分区的循环分配)将不会保持这种顺序。

在状态机的情况下,键可以与log.cleaner.enable一起使用,以对具有相同键的条目进行重复数据删除。在这种情况下,Kafka 假设您的应用程序只关心给定键的最新实例,并且日志清理器仅在该键不为空时才删除给定键的旧副本。这种形式的日志压缩由log.cleaner.delete.retention属性控制,并且需要密钥。

或者,默认启用的更常见的属性log.retention.hours通过删除已过期的完整日志段来工作。在这种情况下,不必提供密钥。Kafka 将简单地删除超过给定保留期的日志块。

这就是说,如果您启用了日志压缩或要求对具有相同密钥的消息进行严格的排序,那么您绝对应该使用密钥。否则,在某些键可能比其他键出现更多的情况下,空键可能会提供更好的分布并防止潜在的热点问题。

于 2015-04-08T13:14:04.767 回答
60

tl; dr 不,向 Kafka 发送消息时不需要密钥。但...


除了非常有用的公认答案之外,我还想添加更多细节

分区

默认情况下,Kafka 使用消息的键来选择它写入的主题的分区。这是在DefaultPartitionerby

kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

如果没有提供密钥,则 Kafka 将以循环方式对数据进行分区。

Partitioner在 Kafka 中,可以通过扩展类来创建自己的 Partitioner 。为此,您需要覆盖partition具有签名的方法:

int partition(String topic, 
              Object key,
              byte[] keyBytes,
              Object value,
              byte[] valueBytes,
              Cluster cluster)

通常,Kafka 消息的key用于选择分区,返回值(类型为int)是分区号。如果没有密钥,您需要依赖处理起来可能要复杂得多的值。

订购

如给定答案中所述,Kafka 仅在分区级别保证消息的排序。

假设您想将客户的金融交易存储在具有两个分区的 Kafka 主题中。消息可能看起来像(键:值)

null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}

由于我们没有定义一个键,这两个分区大概看起来像

// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}

您阅读该主题的消费者最终可能会告诉您帐户上的余额在特定时间为 600,尽管情况并非如此!只是因为它在分区 1 中的消息之前读取了分区 0 中的所有消息。

使用有意义的键(如 customerId)可以避免这种情况,因为分区将如下所示:

// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}

请记住,分区内的排序只有在生产者配置max.in.flight.requests.per.connection设置为 时才能保证1。但是,该配置的默认值是,5它被描述为:

"客户端在阻塞前将在单个连接上发送的未确认请求的最大数量。请注意,如果此设置设置为大于 1 并且发送失败,则存在由于重试而导致消息重新排序的风险(即,如果启用重试)。”

您可以在另一个 Stackoverflow 文章中找到更多详细信息,关于Kafka - Message Ordering Guarantees

日志压缩

如果没有密钥作为消息的一部分,您将无法将主题配置设置cleanup.policycompacted. 根据文档“日志压缩确保 Kafka 将始终为单个主题分区的数据日志中的每个消息键至少保留最后一个已知值。”。

如果没有任何密钥,这个不错且有用的设置将不可用。

密钥的使用

在实际用例中,Kafka 消息的密钥会对您的性能和业务逻辑的清晰度产生巨大影响。

例如,密钥可以自然地用于对数据进行分区。由于您可以控制您的消费者从特定分区中读取,这可以作为一个有效的过滤器。此外,密钥可以包含一些关于消息实际值的元数据,以帮助您控制后续处理。键通常比值小,因此解析键而不是整个值更方便。同时,您可以应用所有序列化和模式注册,就像使用您的值一样,也可以使用密钥。

作为说明,还有Header的概念可用于存储信息,请参阅文档

于 2020-05-20T11:36:01.287 回答
1

带有消息的密钥基本上是为了获取特定字段的消息排序而发送的。

  • 如果 key=null,数据被循环发送(发送到不同的分区和分布式环境中的不同代理。当然,发送到同一个主题。)。
  • 如果发送了一个密钥,那么该密钥的所有消息将始终发送到同一个分区。

解释和例子

  • key 可以是任何字符串或整数等。以整数employee_id 作为key 的示例。
  • 因此emplyee_id 123 将始终进入分区0,emplyee_id 345 将始终进入分区1。这是由取决于分区数量的密钥散列算法决定的。
  • 如果您不发送任何密钥,则消息可以使用循环技术发送到任何分区。
于 2020-09-28T10:00:26.020 回答