1

我想使用 Flink 从输入文件中读取,进行一些聚合,然后将结果写入输出文件。作业处于批处理模式。见wordcount.py下文:

from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output

在运行之前python wordcount.py,我运行echo -e "flink\npyflink\nflink" > /tmp/input以确保 /tmp/input 中存在数据。但是,运行后,/tmp/output 中有两个文件:

> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2

虽然我希望单个文件 /tmp/output 包含内容:

pyflink,1
flink,2

实际上,我通过调整以下生成单个文件 /tmp/output.

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

运行此版本将生成 /tmp/output。请注意,它不带有逗号分隔符。

> cat /tmp/output
flink   2
pyflink 1

知道为什么吗?谢谢!

4

1 回答 1

2

第一次运行它时没有指定并行度,所以你得到了默认的并行度——大于 1(可能是 4 或 8,取决于你的计算机有多少内核)。

Flink 被设计成可扩展的,为了实现这一点,一个算子的并行实例,比如一个接收器,是相互解耦的。例如,想象一下具有 100 或 1000 多个节点的大型集群。为了使其正常工作,每个实例都需要写入自己的文件。

逗号已更改为制表符,因为您指定了.field_delimiter('\t').

于 2021-03-15T09:26:35.157 回答