0

只是想检查我的代码是否错误或者这是 pyflink 1.11.X

当我尝试通过查询('GroupBy Aggregation'或'GroupBy Window Aggregation')计​​算组中元素的数量时,pyFlink会抛出以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes

如果我升级到 pyflink 1.12.XI,可以通过将类型转换为 double 来解决。令人惊讶的是,它适用于 1.12,但不适用于 1.11。

有人可以对此有所了解吗?请在下面找到重现错误的示例。

from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# 1. create a TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

# 2. create source Table
table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '10'
    )
""")

# 3. create sink Table
table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        res INT
    ) WITH (
        'connector' = 'print'
    )
""")

# 4. query from source table and perform caculations
# create a Table from a Table API query:
source_table = table_env.from_path("datagen")
# or create a Table from a SQL query:
source_table = table_env.sql_query("SELECT id FROM datagen")

result_table = source_table.group_by("id").select("id, 1.sum as res")
table_env.create_temporary_view("output_table", result_table)

# 5. emit results via SQL query:
table_env.execute_sql("INSERT INTO print SELECT * FROM output_table").get_job_client().get_job_execution_result().result()
4

0 回答 0