0

我有一个 Flink-1.13 程序,它从包含具有不同模式的记录的 Kinesis 流中读取数据。

我的程序遍历流中包含的所有可能模式,过滤主数据帧并使用 StreamingFileSink.forBulkFormat将GenericRecord记录写入 S3。

看起来像:

final DataStream<Event> stream = env.addSource(....)
List<Tuple2<Schema, Integer>> schemasAndIds = ...

schemasAndIds.forEach(tuple -> {

            final GenericRecordAvroTypeInfo genericRecordAvroTypeInfo = new GenericRecordAvroTypeInfo(tuple.f0);

            final SingleOutputStreamOperator<GenericRecord> ds = stream
                    .filter(...)
                    .map(...)
                    .map(...)
                    .filter(...)
                    .map(new ToGenericRecordMap())
                    .returns(genericRecordAvroTypeInfo); //The exception is thrown because of this line.

            final StreamingFileSink<GenericRecord> avroSink = StreamingFileSink
                    .forBulkFormat(
                            new Path("...."),
                            AvroWriters.forGenericRecord(tuple.f0)
                    )
                    .withBucketAssigner(....)
                    .withRollingPolicy(
                            OnCheckpointRollingPolicy.build()
                    )
                    .build();

            ds.addSink(avroSink);

        });

我的问题是,当我通知 ToGenericRecordMap() 生成的类型时(否则它属于 kryo 序列化并出现另一个错误),我得到以下异常:

org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:238)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not serialize inputs.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
... 9 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize inputs.
at org.apache.flink.streaming.api.graph.StreamConfig.setInputs(StreamConfig.java:262)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:688)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:460)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:412)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:124)
at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122)
at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
at com.openbank.t24reader.T24ReaderJob.main(T24ReaderJob.java:201)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more
Caused by: java.io.UTFDataFormatException
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2170)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2013)
at java.base/java.io.ObjectOutputStream.writeUTF(ObjectOutputStream.java:872)
at org.apache.flink.formats.avro.typeutils.SerializableAvroSchema.writeObject(SerializableAvroSchema.java:55)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546)
at org.apache.flink.streaming.api.graph.StreamConfig.setInputs(StreamConfig.java:260)
... 40 more

有什么问题吗?我错过了什么吗?谢谢你的帮助。

4

0 回答 0