我正在尝试使用 AvroParquetWriter 将镶木地板文件编写为接收器。该文件已创建但长度为 0(未写入数据)。难道我做错了什么 ?无法弄清楚是什么问题
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"/tmp/test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()
writer.write(genericReocrd)
stream.addSink{r =>
writer.write(r)
}
env.execute()