我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的作业将来自 Kafka 或 Kenesis 队列,但这使得玩弄想法(和测试)变得非常困难。
我可以愉快地从 CSV 加载并以批处理模式对其进行处理。但我无法让它在流媒体模式下工作。我将如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 Windows)。
我知道我需要让系统使用 EventTime(因为 ProcTime 会同时出现),但无论如何我都找不到设置它。原则上,我应该能够将 CSV 的列之一设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。
为了让批处理执行测试运行,我使用了下面的代码,它从一个读取input.csv
并输出到一个output.csv
.
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
和 input.csv 是
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
所以我的问题是如何设置它以便它从同一个 CSV 读取,但使用第一列作为事件时间并允许我编写如下代码:
(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用python 3.7
和flink 1.11.1
。