0

我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的作业将来自 Kafka 或 Kenesis 队列,但这使得玩弄想法(和测试)变得非常困难。

我可以愉快地从 CSV 加载并以批处理模式对其进行处理。但我无法让它在流媒体模式下工作。我将如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 Windows)。

我知道我需要让系统使用 EventTime(因为 ProcTime 会同时出现),但无论如何我都找不到设置它。原则上,我应该能够将 CSV 的列之一设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。

为了让批处理执行测试运行,我使用了下面的代码,它从一个读取input.csv并输出到一个output.csv.

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

和 input.csv 是

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

所以我的问题是如何设置它以便它从同一个 CSV 读取,但使用第一列作为事件时间并允许我编写如下代码:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用python 3.7flink 1.11.1

4

2 回答 2

1

如果使用描述符 API,可以通过 schema 指定一个字段为 event-time 字段:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         )

但是我还是推荐你使用DDL,一方面更容易使用,另一方面现有的 Descriptor API 存在一些 bug,社区正在讨论重构 Descriptor API

于 2020-08-19T02:53:42.460 回答
0

您是否尝试过使用水印策略?正如这里提到的,您需要使用水印策略来使用事件时间。对于 pyflink 案例,我个人认为像这样以 ddl 格式声明它更容易。

于 2020-08-13T02:44:06.117 回答