我们正在使用 ksqldb 0.18 并尝试从这些主题的派生流中以菊花链方式连接一些主题。Afaik 派生主题中的底层消息可以使用 ksql 进行连接和查询,就像我们自己创建主题一样
CREATE STREAM IF NOT EXISTS n_stream (
uuid VARCHAR KEY ,
cluster_id VARCHAR
) WITH (kafka_topic='process_n', value_format='json');
CREATE STREAM IF NOT EXISTS o_stream (
uuid VARCHAR KEY,
project_id VARCHAR,
) WITH (kafka_topic='process_o', value_format='json');
CREATE STREAM IF NOT EXISTS n_plus_o AS
SELECT
n_stream.uuid AS uuid
FROM n_stream
INNER JOIN o_stream WITHIN 1 HOURS ON n_stream.uuid = o_stream.uuid;
但是,当我尝试从中创建一个新主题时,内部连接会静默失败,最后我会收到一部分消息。
CREATE STREAM IF NOT EXISTS n_plus_o_plus_u AS
SELECT
u_stream.uuid AS uuid
FROM n_plus_o
INNER JOIN u_stream WITHIN 10 MINUTES ON n_plus_o.uuid = u_stream.uuid ;
这是预期的行为吗?
TLDR : (A+B) = Z; Z+C 的输出与 A+B+C 不同