2

有什么方法可以在 Kafka 消息有效负载中添加时间戳标头?我想检查消息何时在消费者端创建并基于此应用自定义逻辑。

编辑

我正在尝试找到一种将一些自定义值(基本上是时间戳)附加到生产者发布的消息的方法,以便我可以在特定时间段内使用消息。现在 Kafka 只确保消息按照放入队列的顺序传递。但在我的情况下,先前生成的记录可能会在一定延迟后到达(因此在时间 T1 生成的消息可能具有比在稍后时间 T2 生成的偏移量为 0 的消息更高的偏移量 1)。出于这个原因,它们不会按照我在消费者端的预期顺序。所以我基本上是在寻找一种方式来以有序的方式消费它们。

当前的 Kafka 0.8 版本除了在生产者端附加“消息密钥”之外没有其他方法,在这里找到了一个类似的主题,建议在消息有效负载中对​​其进行编码。但是我做了很多搜索,但找不到可能的方法。

此外,我不知道这种方法是否对 Kafka 的整体性能有任何影响,因为它在内部管理消息偏移量,并且从这个页面中可以看出,目前还没有公开这样的 API

如果我的想法完全正确,或者有任何可能的方法,我真的很感激任何线索,我都准备试一试

4

4 回答 4

4

如果您想在特定时间段内使用消息,那么我可以为您提供解决方案,但是从该时间段按顺序使用消息是困难的。我也在寻找相同的解决方案。检查以下链接

Kafka Qqueue 中的消息排序

获取特定时间数据的解决方案

对于时间 T1,T2,...TN ,其中 T 是时间范围;将主题划分为 N 个分区。现在使用 Partitioner Class 以这样一种方式生成消息,即消息生成时间应该用于决定应该为该消息使用哪个分区。

同样,在消费时订阅您想要消费的时间范围内的确切分区。

于 2013-09-14T11:03:55.793 回答
1

您可以创建一个包含分区信息和创建此消息时的时间戳的类,然后将其用作 Kafka 消息的键。然后,您可以使用包装器 Serde 将此类转换为字节数组并返回,因为 Kafka 只能理解字节。然后,当您在消费者端接收到作为一袋字节的消息时,您可以对其进行反序列化并检索时间戳,然后将其引导到您的逻辑中。

例如:

public class KafkaKey implements Serializable {
    private long mTimeStampInSeconds;
    /* This contains other partitioning data that will be used by the
    appropriate partitioner in Kafka. */
    private PartitionData mPartitionData;

    public KafkaKey(long timeStamp, ...) {
        /* Initialize key */
        mTimeStampInSeconds = timestamp;
    }

    /* Simple getter for timestamp */
    public long getTimeStampInSeconds() {
        return mTimeStampInSeconds;
    }

    public static byte[] toBytes(KafkaKey kafkaKey) {
        /* Some serialization logic. */
    }

    public static byte[] toBytes(byte[] kafkaKey) throws Exception {
        /* Some deserialization logic. */
    }
}

/* Producer End */

KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));

/* Consumer End */
MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());

long timestamp = kafkaKey.getTimeStampInSeconds();
/*
 * And happily ever after */

这将比使特定分区对应于时间间隔更灵活。否则,您将不得不继续为不同的时间范围添加分区,并保持一个单独的、同步的列表,以显示哪个分区对应于哪个时间范围,这很快就会变得笨拙。

于 2014-08-31T02:41:48.303 回答
0

请注意,Kafka 根据此讨论将时间戳引入消息的内部表示: https ://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

还有这些票: https ://issues.apache.org/jira/browse/KAFKA-2511

它应该在所有版本的 Kafka0.10.0.0和更高版本中都可用。

这里的问题是您以不再需要的顺序摄取消息。如果订单很重要,那么您需要放弃相关生产者中的并行性。然后消费者级别的问题就消失了。

于 2017-09-14T23:20:25.827 回答
0

看起来会帮助您实现目标。它允许您轻松定义和编写隐藏(反)序列化负担的消息头。您唯一需要提供的是您通过线路发送的实际对象的(反)序列化器。这种实现实际上尽可能地延迟了有效负载对象的反序列化过程,这意味着您可以(以一种非常高效和透明的方式)反序列化标头,检查时间戳,并且仅在当/何时反序列化有效负载(重位)您确定该对象对您有用。

于 2015-07-20T21:19:56.160 回答