0

我想在 PyFlink 中使用 Table API 读取多个目录,

from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode


if __name__ == 'main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    env.set_parallelism(1)

    table_env = StreamTableEnvironment.create(stream_execution_environment=env)
    table_env \
        .get_config() \
        .get_configuration() \
        .set_string("default.parallelism", "1")


    ddl = """
        CREATE TABLE test (
            a INT,
            b STRING
        ) WITH (
            'connector' = 'filesystem',          
            'path' = '{path}', 
            'format' = 'csv',
            'csv.ignore-first-line' = 'true',
            'csv.ignore-parse-errors' = 'true',
            'csv.array-element-delimiter' = ';'
        )
    """.format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')

    table_env.execute_sql(ddl)

但失败并出现以下错误:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.

我确定这三个目录存在并且我有权访问它:
/opt/data/day=2021-11-14、
/opt/data/day=2021-11-15、
/opt/data/day=2021 -11-16

如果不能读取多个目录,我必须创建三个表并将它们合并,这更加冗长。

任何建议都是值得赞赏的。谢谢

4

1 回答 1

1

只是使用

'path' = '/opt/data'

应该足够了。文件系统连接器还能够读取分区字段并基于它执行过滤。例如,您可以使用此架构定义表:

CREATE TABLE test (
        a INT,
        b STRING,
        day DATE
) PARTITIONED BY (day) WITH (
        'connector' = 'filesystem',          
        'path' = '/opt/data', 
        [...]
)

然后是以下查询:

SELECT * FROM test WHERE day = '2021-11-14'

将只读取文件/opt/data/day=2021-11-14

于 2021-11-16T10:13:41.260 回答