我正在使用lyft flink 运营商部署的 Flink 1.14
我正在尝试使用 Table API 进行翻转窗口聚合,从事务表源中读取,并将窗口聚合结果放入新的 kafka 主题
我的来源是 debezium 的一个 kafka 主题
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
//this is the source
tEnv.executeSql("CREATE TABLE transactions (\n" +
" event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"+
" transaction_time AS TO_TIMESTAMP_LTZ(4001, 3),\n"+
" id INT PRIMARY KEY,\n" +
" transaction_status STRING,\n" +
" transaction_type STRING,\n" +
" merchant_id INT,\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'debezium-json.schema-include' = 'true' ,\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.transactions',\n" +
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n"+
" 'format' = 'debezium-json'\n" +
")");
我做翻滚窗口并通过以下方式计算同一窗口中的ID:
public static Table report(Table transactions) {
return transactions
.window(Tumble.over(lit(2).minutes()).on($("transaction_time")).as("w"))
.groupBy($("w"), $("transaction_status"))
.select(
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("transaction_status"),
$("id").count().as("id_count"));
}
水槽是:
tEnv.executeSql("CREATE TABLE my_report (\n" +
"window_start TIMESTAMP(3),\n"+
"window_end TIMESTAMP(3)\n,"+
"transaction_status STRING,\n" +
" id_count BIGINT,\n" +
" PRIMARY KEY (window_start) NOT ENFORCED\n"+
") WITH (\n" +
" 'connector' = 'upsert-kafka',\n" +
" 'topic' = 'dbserver1.inventory.my-window-sink',\n" +
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'key.format' = 'json',\n"+
" 'value.format' = 'json'\n"+
")");
Table transactions = tEnv.from("transactions");
Table merchants = tEnv.from("merchants");
report(transactions).executeInsert("my_report");
问题是当我使用 dbserver1.inventory.my-window-sink 时 kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning
我没有得到任何结果,我等待 2 分钟(窗口大小),插入事务表,然后再次等待 2 分钟并再次插入也没有结果。我不知道我的水印是否有问题
我正在使用并行性:2
在 flink 仪表板 UI 上,我可以看到在 GroupWindowAggregate 任务的详细信息中,当我插入表时,收到的记录增加了,但是当我使用主题时,我仍然看不到结果!