3

我在使用 Confluent 4.1 的流和表中重新输入了数据

1)创建流

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2)创建重新加密的流,因为这个脚本不起作用,但在此之前它的工作,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

然后我创建下一个脚本

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

session_details_stream_rekeyed 的结果是好的:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3)为主题创建流;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
 CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
 CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;



 ksql> select * from voip_details_stream_rekeyed6;
      1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4)创建表

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

 ksql> select * from voip_details_table;

       1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5)然后我创建一个左连接

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

   1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

哪里有问题?

4

1 回答 1

6

tl;dr进行流表连接时,您的表消息必须在流消息之前已经存在(并且必须加时间戳)。如果您重新发送源流消息,则在填充表主题后,连接将成功。

示例数据

用于kafkacat填充主题(将数据粘贴到stdin

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

验证主题内容:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

声明源流

ksql> CREATE STREAM session_details_stream \
      (Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
      WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
      (SessionIdTime varchar,SessionIdSeq long, Details varchar) \
      WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

在 SessionIdTime+SessionIdSeq 上重新分区每个主题

ksql> CREATE STREAM SESSION AS \
      SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
      FROM session_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
      SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
      FROM voip_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

声明表

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
      WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

 Message
---------------
 Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

将 SESSION 流加入 VOIP 表

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
      FROM SESSION s \
      LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

让上面的 JOIN 查询继续运行。将 SESSION 消息重新发送到源主题(kafkacat用于发送与sessionDetails上述相同的消息):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Per Rohan Desai 在Confluent Community Slack上:

问题是您的流中记录的行时间早于您希望它加入的表中记录的行时间。所以当流记录被处理时,表中没有对应的记录

查看源表上的消息以获取ROWTIME用于查看消息时间戳的连接键之一(不要与基于时间戳的混淆root):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

将此与源会话流主题上的消息进行比较:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

其中第一个(at 11:32:10/ 1526553130865)在相应VOIP消息的之前(如上所示),并导致null我们首先看到的连接结果。后面的第二个11:46:28( / 1526553988639)产生了我们随后看到的成功连接:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
于 2018-05-17T11:12:42.367 回答