0

我想确保 flink 在设置方面有效,然后尝试使使用复杂化。在最简单的例子中,我尝试做这些事情。输入包含

column_a,column_b 1,2

输出存在。为了使用 docker 为我的应用程序下载 1.10 的 pyflink 版本,我使用以下代码片段:

 jobmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8088:8088"
    command: jobmanager
    environment:
     - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
    - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
source_ddl = """
                CREATE TABLE MyUserTable (
                        column_a INT PRIMARY KEY,
                        column_b INT,
                        
                        ) WITH (
                          'connector' = 'filesystem',         
                          'path' = 'file:///Users//code/examples/input.csv ', 
                          'format' = 'csv'

                        )"""


#connector for data output/sink
sink_ddl = """
                CREATE TABLE table2 (
                            score INT)
                            WITH (
                                'connector' = 'filesystem',
                                'path' = 'file:///Users//code/examples/output.csv',
                                'format' = 'csv'
                            )"""

#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)

#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")

# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT * FROM MyUserTable")
table_result2.print()

产生的错误如下

  File ".\flink1.py", line 41, in <module>
    source_table = table_env.execute_sql(source_ddl)
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\table\table_environment.py", line 804, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
    return f(*a, **kw)
  File "C:\Users\landr\AppData\Local\Programs\Python\Python38\lib\site-packages\py4j\protocol.py", line 326, 
in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered ")" at line 6, column 25.     
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)   
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450) 
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
        at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
        at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
        ... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...

        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4835)     
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5209)  
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)      
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)      
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)  
        at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
        ... 15 more
4

1 回答 1

1

去掉后面的逗号"column_b INT,"

这应该有效。

于 2021-12-12T17:01:51.673 回答