我正在尝试通过 python 连接到 Flink。数据源是postgresql的CDC(Change-Data-Capture)。
我正在使用的是:
- postgresql==10.1
- wal2json
- 卡夫卡==2.12-2.1.0
- flink==1.13.0
- debezium==1.13.1.Final
卡夫卡运行良好。消费者可以获得CDC消息。但是 Flink 抛出了这个错误。这个恼人的问题困扰了我一个星期。
我的代码:
DDL = """CREATE TABLE topic_test_slot (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
id BIGINT,
name STRING,
name1 STRING,
name2 STRING
) WITH (
'connector'='kafka',
'topic'='wh_testing_boss_statistics1.public.test_slot',
'properties.bootstrap.servers'='192.168.2.13:9092',
'scan.startup.mode' = 'earliest-offset',
'format'='debezium-json'
)"""
jars = f"file:///home/flink-sql-connector-kafka_2.12-1.13.0.jar;" \
f"file:///home/flink-json-1.13.0.jar;" \
f"file:///home/flink-python_2.12-1.13.0.jar;" \
f"file:///home/flink-json-1.13.0.jar;" \
f"file:///home/flink-sql-connector-postgres-cdc-1.3.0.jar;" \
f"file:///home/flink-connector-kafka_2.12-1.13.0.jar"
# create a blink stream TableEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",jars)
table_env.execute_sql(DDL)
table = table_env.from_path("topic_test_slot")
table_head = table.limit(1)
table_head.to_pandas()
错误报告如下:
Py4JJavaError: 调用 z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame 时出错。:java.lang.RuntimeException:无法在 org.apache.flink.streaming.api.operators 的 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 处获取下一个结果。 collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 在 org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) 在 org.apache.flink.table.runtime.arrow。 ArrowUtils.filterOutRetractRows(ArrowUtils.java:735) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:673) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。 NativeMethodAccessorImpl。在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next( CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 15 更多原因:java.util.concurrent.ExecutionException: org.apache .flink.runtime.client.JobExecutionException:作业执行失败。在 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 在 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .getAccumulatorResults(CollectResultFetcher.java:175) ... 17 更多原因:org.apache。flink.runtime.client.JobExecutionException:作业执行失败。在 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 在 org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) 在 java.util。 concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache .flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 17 更多原因: org.apache.flink.runtime.JobException:46274339048152,"xmin":null},"op":"c","ts_ms":1624873855079,"transaction":null}}'。在 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:173) 在 org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) 在 org .apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org .apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink .
我认为关键原因是:
引起:java.lang.NullPointerException
但是谷歌没有一个经典的答案。大多数答案对我来说都没用。
感谢任何建议。