3

我创建了这个记录:

new ProducerRecord(topic = "petstore-msg-topic", key = msg.username, value = s"${msg.route},${msg.time}")

我现在想做这样的事情:

CREATE STREAM petstorePages (KEY, route VARCHAR, time VARCHAR) \
                  WITH (KAFKA_TOPIC='petstore-msg-topic', VALUE_FORMAT='DELIMITED');

是否有可能在 Stream 创建中访问密钥,或者我是否必须在值中也包含密钥?

4

1 回答 1

3

它是自动添加的并称为 ROWKEY

KSQL 为每个流和表添加隐式列 ROWTIME 和 ROWKEY,表示对应的 Kafka 消息时间戳和消息键

https://docs.confluent.io/current/ksql/docs/syntax-reference.html#id16

于 2018-08-20T13:13:13.460 回答