tl;dr进行流表连接时,您的表消息必须在流消息之前已经存在(并且必须加时间戳)。如果您重新发送源流消息,则在填充表主题后,连接将成功。
示例数据
用于kafkacat
填充主题(将数据粘贴到stdin
)
cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs
cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs
验证主题内容:
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
声明源流
ksql> CREATE STREAM session_details_stream \
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
(SessionIdTime varchar,SessionIdSeq long, Details varchar) \
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated
在 SessionIdTime+SessionIdSeq 上重新分区每个主题
ksql> CREATE STREAM SESSION AS \
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
FROM session_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo
ksql> CREATE STREAM VOIP AS \
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
FROM voip_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql>
声明表
ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');
Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b
将 SESSION 流加入 VOIP 表
ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
FROM SESSION s \
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null
让上面的 JOIN 查询继续运行。将 SESSION 消息重新发送到源主题(kafkacat
用于发送与sessionDetails
上述相同的消息):
1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
Per Rohan Desai 在Confluent Community Slack上:
问题是您的流中记录的行时间早于您希望它加入的表中记录的行时间。所以当流记录被处理时,表中没有对应的记录
查看源表上的消息以获取ROWTIME
用于查看消息时间戳的连接键之一(不要与基于时间戳的混淆root
):
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2
将此与源会话流主题上的消息进行比较:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo
其中第一个(at 11:32:10
/ 1526553130865
)在相应VOIP
消息的之前(如上所示),并导致null
我们首先看到的连接结果。后面的第二个11:46:28
( / 1526553988639
)产生了我们随后看到的成功连接:
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2