我正在尝试使用 Flink 中的 RollingSink 将序列化为 AVRO 的案例类写入 HDFS。为了使 avro 文件可以被 HDFS 反序列化,我使用了包装 FSDataOutputStream 的 DataFileWriter。当我尝试在 DataFileWriter 和 FSDataOutputStream 之间进行同步以关闭 HDFS 上的数据文件时,会引发异常,实际上我在每个其他文件中都获取了数据。有没有办法在 Flink Writer 实现中将 fs 流与 Avro writer 同步?
我曾尝试使用 DataFileWriter close() flush() sync() fsync() 但都失败了。同步方法似乎表现最好。我也尝试过在 write 方法中同步,这似乎有效,但仍然产生错误,我无法验证是否所有数据都保存到文件中。
class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] {
val serialVersionUID = 1L
var outputStream: FSDataOutputStream = null
var outputWriter: DataFileWriter[OutputContainer] = null
override def open(outStream: FSDataOutputStream): Unit = {
if (outputStream != null) {
throw new IllegalStateException("AvroWriter has already been opened.")
}
outputStream = outStream
if(outputWriter == null) {
val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$)
outputWriter = new DataFileWriter[OutputContainer](writer)
outputWriter.create(OutputContainer.SCHEMA$, outStream)
}
}
override def flush(): Unit = {}
override def close(): Unit = {
if(outputWriter != null) {
outputWriter.sync()
}
outputStream = null
}
override def write(element: OutputContainer) = {
if (outputStream == null) {
throw new IllegalStateException("AvroWriter has not been opened.")
}
outputWriter.append(element)
}
override def duplicate(): AvroWriter[OutputContainer] = {
new AvroWriter[OutputContainer]
}
}
尝试使用上述代码运行 RollingSink 会出现以下异常:
java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401)
at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36)
at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476)
at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419)
at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more