0

是否可以像使用 CSV 一样在 Table API 和/或 DataStream API 中使用 JSON 文件接收器?

谢谢 !

代码

my_sink_ddl = f"""
    create table mySink (
        id STRING,
        dummy_item STRING
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'json',
        'connector.path' = 'output.json'
    )
"""

错误

TableException: findAndCreateTableSink failed.
4

1 回答 1

4

是的,根据 Jira FLINK-17286 Integrate json to file system connector和相应的 pull request [ FLINK-17286][connectors / filesystem]Integrate json to file system connector #12010,可以从 Flink 开始1.11。在 Flink 之前,1.11我认为它不受支持。

您需要使用以下配置:

... with (
        'connector' = 'filesystem',
        'format' = 'json',
        'path' = 'output_json' -- This must be a directory
    )

加上以下环境定义:

t_env = BatchTableEnvironment.create(   environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()) 
于 2020-11-06T07:09:24.930 回答