我正在尝试使用 PyFlink 将流放入 csv 格式的文件系统中,但是它不起作用。
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
要运行脚本:
$ python stream_to_csv.py
我希望记录转到 /tmp/output 文件夹,但是这不会发生。
$ ~ ls /tmp/output
(nothing shown here)
有什么我想念的吗?