我stream1
在KSQL
(5.0 Beta 版)中创建了一个支持主题topic1
和avro
架构。我能够阅读有关topic1
使用的所有消息kafka-avro-console-consumer
。
然后,我基于消息和支持主题的格式创建stream2
了名为. 我能够阅读有关使用的所有消息KSQL
stream1
json
topic2
topic2
kafka-console-consumer
我基于消息格式和名为stream3
. 但是,我无法阅读有关使用.KSQL
stream2
json
topic3
topic3
kafka-console-consumer
使用kafkacat
I 在各个分区上获得偏移量,topic3
但没有打印任何实际消息。
看起来消息在主题中,但两者都kafkacat
无法kafka-console-consumer
打印。
尝试使用--from-beginning
但--offset earliest --partition 0
没有运气。
这是 KSQL 语句
CREATE STREAM stream1(p_id STRING, location STRING, u_id STRING, r_id STRING, b_id STRING, recorded_dtm STRING,
v_type STRING, value STRING) WITH (kafka_topic='topic1', value_format='AVRO');
CREATE STREAM stream2 WITH (KAFKA_topic='topic2', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP')
AS select P_ID+'-'+LOCATION+'-'+U_ID+'-'+R_ID+'-'+B_ID+'-'+V_TYPE as PARTITION_KEY,
LOCATION, U_ID, R_ID, V_TYPE, B_ID, STRINGTOTIMESTAMP(recorded_dtm, 'yyyyMMddHHmmss') as RECORDED_TIMESTAMP,
P_ID, VALUE, RECORDED_DTM,'NM' as DATA_TYPE
FROM stream1 PARTITION BY PARTITION_KEY;
CREATE STREAM stream3 WITH (KAFKA_topic='topic3', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP')
AS select PARTITION_KEY, LOCATION, U_ID, R_ID, V_TYPE, B_ID, RECORDED_TIMESTAMP,
P_ID, VALUE, RECORDED_DTM FROM stream2 PARTITION BY PARTITION_KEY;
附加信息
如果ksql
我运行SET 'auto.offset.reset'='earliest';
并运行,select * from stream1 limit 5;
或者select * from stream2 limit 5
我看到打印的记录但select * from stream3 limit 5
不返回任何记录。
如果我跑,describe extended stream3
我会得到
消息总数:212
这恰好是我发送给 topic1 的消息数