1

我正在尝试使用 AvroParquetWriter 将镶木地板文件编写为接收器。该文件已创建但长度为 0(未写入数据)。难道我做错了什么 ?无法弄清楚是什么问题

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"/tmp/test-$now.parquet")
  val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(schema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()

  writer.write(genericReocrd)
  stream.addSink{r =>
    writer.write(r)
  }
  env.execute()
4

1 回答 1

0

问题是你没有关闭ParquetWriter. 这是将挂起的元素刷新到磁盘所必需的。您可以通过在方法中定义自己的RichSinkFunction关闭位置来解决问题:ParquetWriterclose

class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
  var parquetWriter: ParquetWriter[GenericRecord] = null

  override def open(parameters: Configuration): Unit = {
    parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .withValidation(config.validating)
      .build()
  }

  override def close(): Unit = {
    parquetWriter.close()
  }

  override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {
    parquetWriter.write(value)
  }
}
于 2018-11-29T10:19:47.920 回答