0

我正在尝试通过 python 连接到 Flink。数据源是postgresql的CDC(Change-Data-Capture)。

我正在使用的是:

  • postgresql==10.1
  • wal2json
  • 卡夫卡==2.12-2.1.0
  • flink==1.13.0
  • debezium==1.13.1.Final

卡夫卡运行良好。消费者可以获得CDC消息。但是 Flink 抛出了这个错误。这个恼人的问题困扰了我一个星期。

我的代码:

DDL = """CREATE TABLE topic_test_slot (
        origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
        event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
        origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
        origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
        origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
        id BIGINT,
        name STRING,
        name1 STRING,
        name2 STRING
    ) WITH (
        'connector'='kafka',
        'topic'='wh_testing_boss_statistics1.public.test_slot',
        'properties.bootstrap.servers'='192.168.2.13:9092',
        'scan.startup.mode' = 'earliest-offset',
        'format'='debezium-json'


    )"""

jars = f"file:///home/flink-sql-connector-kafka_2.12-1.13.0.jar;" \
       f"file:///home/flink-json-1.13.0.jar;" \
       f"file:///home/flink-python_2.12-1.13.0.jar;" \
       f"file:///home/flink-json-1.13.0.jar;" \
       f"file:///home/flink-sql-connector-postgres-cdc-1.3.0.jar;" \
       f"file:///home/flink-connector-kafka_2.12-1.13.0.jar"


# create a blink stream TableEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",jars)
table_env.execute_sql(DDL)    
table = table_env.from_path("topic_test_slot")
table_head = table.limit(1)
table_head.to_pandas()

错误报告如下:

Py4JJavaError: 调用 z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame 时出错。:java.lang.RuntimeException:无法在 org.apache.flink.streaming.api.operators 的 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 处获取下一个结果。 collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 在 org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) 在 org.apache.flink.table.runtime.arrow。 ArrowUtils.filterOutRetractRows(ArrowUtils.java:735) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:673) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。 NativeMethodAccessorImpl。在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next( CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 15 更多原因:java.util.concurrent.ExecutionException: org.apache .flink.runtime.client.JobExecutionException:作业执行失败。在 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 在 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .getAccumulatorResults(CollectResultFetcher.java:175) ... 17 更多原因:org.apache。flink.runtime.client.JobExecutionException:作业执行失败。在 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 在 org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) 在 java.util。 concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache .flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 17 更多原因: org.apache.flink.runtime.JobException:46274339048152,"xmin":null},"op":"c","ts_ms":1624873855079,"transaction":null}}'。在 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:173) 在 org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) 在 org .apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org .apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink .

我认为关键原因是:

引起:java.lang.NullPointerException

但是谷歌没有一个经典的答案。大多数答案对我来说都没用。

感谢任何建议。

4

0 回答 0