您可以创建一个包含分区信息和创建此消息时的时间戳的类,然后将其用作 Kafka 消息的键。然后,您可以使用包装器 Serde 将此类转换为字节数组并返回,因为 Kafka 只能理解字节。然后,当您在消费者端接收到作为一袋字节的消息时,您可以对其进行反序列化并检索时间戳,然后将其引导到您的逻辑中。
例如:
public class KafkaKey implements Serializable {
private long mTimeStampInSeconds;
/* This contains other partitioning data that will be used by the
appropriate partitioner in Kafka. */
private PartitionData mPartitionData;
public KafkaKey(long timeStamp, ...) {
/* Initialize key */
mTimeStampInSeconds = timestamp;
}
/* Simple getter for timestamp */
public long getTimeStampInSeconds() {
return mTimeStampInSeconds;
}
public static byte[] toBytes(KafkaKey kafkaKey) {
/* Some serialization logic. */
}
public static byte[] toBytes(byte[] kafkaKey) throws Exception {
/* Some deserialization logic. */
}
}
/* Producer End */
KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));
/* Consumer End */
MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());
long timestamp = kafkaKey.getTimeStampInSeconds();
/*
* And happily ever after */
这将比使特定分区对应于时间间隔更灵活。否则,您将不得不继续为不同的时间范围添加分区,并保持一个单独的、同步的列表,以显示哪个分区对应于哪个时间范围,这很快就会变得笨拙。