0

我们正在使用 ksqldb 0.18 并尝试从这些主题的派生流中以菊花链方式连接一些主题。Afaik 派生主题中的底层消息可以使用 ksql 进行连接和查询,就像我们自己创建主题一样

CREATE STREAM IF NOT EXISTS n_stream (
    uuid VARCHAR KEY ,
    cluster_id VARCHAR
) WITH (kafka_topic='process_n', value_format='json');
CREATE STREAM IF NOT EXISTS o_stream (
    uuid VARCHAR KEY,
    project_id VARCHAR,
) WITH (kafka_topic='process_o',  value_format='json');
CREATE STREAM IF NOT EXISTS n_plus_o AS
    SELECT 
        n_stream.uuid AS uuid
    FROM n_stream
    INNER JOIN o_stream WITHIN 1 HOURS ON n_stream.uuid = o_stream.uuid;

但是,当我尝试从中创建一个新主题时,内部连接会静默失败,最后我会收到一部分消息。

CREATE STREAM IF NOT EXISTS n_plus_o_plus_u AS
    SELECT 
        u_stream.uuid AS uuid 
    FROM n_plus_o
    INNER JOIN u_stream WITHIN 10 MINUTES ON n_plus_o.uuid = u_stream.uuid ;

这是预期的行为吗?

TLDR : (A+B) = Z; Z+C 的输出与 A+B+C 不同

4

0 回答 0