0

stream1KSQL(5.0 Beta 版)中创建了一个支持主题topic1avro架构。我能够阅读有关topic1使用的所有消息kafka-avro-console-consumer

然后,我基于消息和支持主题的格式创建stream2了名为. 我能够阅读有关使用的所有消息KSQLstream1jsontopic2topic2kafka-console-consumer

我基于消息格式和名为stream3. 但是,我无法阅读有关使用.KSQLstream2jsontopic3topic3kafka-console-consumer

使用kafkacatI 在各个分区上获得偏移量,topic3但没有打印任何实际消息。

看起来消息在主题中,但两者都kafkacat无法kafka-console-consumer打印。

尝试使用--from-beginning--offset earliest --partition 0没有运气。

这是 KSQL 语句

CREATE STREAM stream1(p_id STRING, location STRING, u_id STRING, r_id STRING, b_id STRING, recorded_dtm STRING, 
v_type STRING, value STRING) WITH (kafka_topic='topic1', value_format='AVRO');

CREATE STREAM stream2 WITH (KAFKA_topic='topic2', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP') 
AS select P_ID+'-'+LOCATION+'-'+U_ID+'-'+R_ID+'-'+B_ID+'-'+V_TYPE as PARTITION_KEY, 
LOCATION, U_ID, R_ID, V_TYPE, B_ID, STRINGTOTIMESTAMP(recorded_dtm, 'yyyyMMddHHmmss') as RECORDED_TIMESTAMP, 
P_ID, VALUE, RECORDED_DTM,'NM' as DATA_TYPE 
FROM stream1 PARTITION BY PARTITION_KEY;

CREATE STREAM stream3 WITH (KAFKA_topic='topic3', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP') 
AS select PARTITION_KEY, LOCATION, U_ID, R_ID, V_TYPE, B_ID, RECORDED_TIMESTAMP, 
P_ID, VALUE, RECORDED_DTM FROM stream2 PARTITION BY PARTITION_KEY;

附加信息

如果ksql我运行SET 'auto.offset.reset'='earliest';并运行,select * from stream1 limit 5;或者select * from stream2 limit 5我看到打印的记录但select * from stream3 limit 5不返回任何记录。

如果我跑,describe extended stream3我会得到

消息总数:212

这恰好是我发送给 topic1 的消息数

4

1 回答 1

0

根本原因是Timestampon发送到 topic1 消息STREAM3的列的值早于.recorded_dtmlog.retention.hourskafka server.properties

我们的log.retention.hours值设置为24 hours并且recorded_dtm 值早于24 小时。这导致消息在保留策略中被立即删除STREAM3topic3

于 2018-07-17T10:43:12.257 回答