0

我正在使用lyft flink 运营商部署的 Flink 1.14

我正在尝试使用 Table API 进行翻转窗口聚合,从事务表源中读取,并将窗口聚合结果放入新的 kafka 主题

我的来源是 debezium 的一个 kafka 主题

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

    //this is the source
    tEnv.executeSql("CREATE TABLE transactions (\n" +
            " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"+
            "  transaction_time AS TO_TIMESTAMP_LTZ(4001, 3),\n"+
            "  id INT PRIMARY KEY,\n" +
            "  transaction_status STRING,\n" +
            "  transaction_type STRING,\n" +
            "  merchant_id INT,\n" +
            "  WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
            ") WITH (\n" +
            " 'debezium-json.schema-include' = 'true' ,\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'dbserver1.inventory.transactions',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'scan.startup.mode' = 'earliest-offset',\n"+
            " 'format' = 'debezium-json'\n" +
            ")");

我做翻滚窗口并通过以下方式计算同一窗口中的ID:

public static Table report(Table transactions) {
    return transactions
            .window(Tumble.over(lit(2).minutes()).on($("transaction_time")).as("w"))
            .groupBy($("w"), $("transaction_status"))
            .select(
                    $("w").start().as("window_start"),
                    $("w").end().as("window_end"),
                    $("transaction_status"),
                    $("id").count().as("id_count"));
}

水槽是:

tEnv.executeSql("CREATE TABLE my_report (\n" +
            "window_start TIMESTAMP(3),\n"+
            "window_end TIMESTAMP(3)\n,"+
            "transaction_status STRING,\n" +
            " id_count BIGINT,\n" +
            " PRIMARY KEY (window_start) NOT ENFORCED\n"+
            ") WITH (\n" +
            " 'connector' = 'upsert-kafka',\n" +
            " 'topic' = 'dbserver1.inventory.my-window-sink',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'key.format' = 'json',\n"+
            " 'value.format' = 'json'\n"+
            ")");
    Table transactions = tEnv.from("transactions");
    Table merchants = tEnv.from("merchants");
    report(transactions).executeInsert("my_report");

问题是当我使用 dbserver1.inventory.my-window-sink 时 kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning我没有得到任何结果,我等待 2 分钟(窗口大小),插入事务表,然后再次等待 2 分钟并再次插入也没有结果。我不知道我的水印是否有问题

我正在使用并行性:2

在 flink 仪表板 UI 上,我可以看到在 GroupWindowAggregate 任务的详细信息中,当我插入表时,收到的记录增加了,但是当我使用主题时,我仍然看不到结果!

4

2 回答 2

1

有了这条线

transaction_time AS TO_TIMESTAMP_LTZ(4001, 3)

您已为每个事件提供相同的交易时间 (4001),并且

WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND

您已安排水印取决于 transaction_time。有了这样的安排,时间就静止了,窗户永远不会关闭。

至于“我等待 2 分钟(窗口大小)”,这不是事件时间处理的工作方式。假设时间戳和水印实际上正在向前移动,那么无论处理 2 分钟的数据需要多长时间,您都需要等待。

于 2021-11-14T15:59:56.620 回答
0

除了 David 谢天谢地的回答之外,我还缺少table.exec.source.idle-timeout作为流环境的配置,这是一个检查源是否空闲的变量。变量的默认值为 0,这意味着它不检查源是否空闲。我将其设置为 1000 毫秒并修复了它,因为它检查了空闲源条件,并且以这种方式正确生成了水印。这可能不会影响具有一致消息摄取的常规流,但对我来说就是这种情况,因为我手动插入记录,因此流很多时候都是空闲的

于 2021-11-21T17:43:18.723 回答