我想在 PyFlink 中使用 Table API 读取多个目录,
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
if __name__ == 'main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env \
.get_config() \
.get_configuration() \
.set_string("default.parallelism", "1")
ddl = """
CREATE TABLE test (
a INT,
b STRING
) WITH (
'connector' = 'filesystem',
'path' = '{path}',
'format' = 'csv',
'csv.ignore-first-line' = 'true',
'csv.ignore-parse-errors' = 'true',
'csv.array-element-delimiter' = ';'
)
""".format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')
table_env.execute_sql(ddl)
但失败并出现以下错误:
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.
我确定这三个目录存在并且我有权访问它:
/opt/data/day=2021-11-14、
/opt/data/day=2021-11-15、
/opt/data/day=2021 -11-16
如果不能读取多个目录,我必须创建三个表并将它们合并,这更加冗长。
任何建议都是值得赞赏的。谢谢