只是想检查我的代码是否错误或者这是 pyflink 1.11.X
当我尝试通过查询('GroupBy Aggregation'或'GroupBy Window Aggregation')计算组中元素的数量时,pyFlink会抛出以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes
如果我升级到 pyflink 1.12.XI,可以通过将类型转换为 double 来解决。令人惊讶的是,它适用于 1.12,但不适用于 1.11。
有人可以对此有所了解吗?请在下面找到重现错误的示例。
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
# 2. create source Table
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10'
)
""")
# 3. create sink Table
table_env.execute_sql("""
CREATE TABLE print (
id INT,
res INT
) WITH (
'connector' = 'print'
)
""")
# 4. query from source table and perform caculations
# create a Table from a Table API query:
source_table = table_env.from_path("datagen")
# or create a Table from a SQL query:
source_table = table_env.sql_query("SELECT id FROM datagen")
result_table = source_table.group_by("id").select("id, 1.sum as res")
table_env.create_temporary_view("output_table", result_table)
# 5. emit results via SQL query:
table_env.execute_sql("INSERT INTO print SELECT * FROM output_table").get_job_client().get_job_execution_result().result()