0

我正在使用 Confluent 的 KafkaAvroDerserializer 反序列化通过 Kafka 发送的 Avro 对象。我想将收到的数据写入 Parquet 文件。我希望能够将数据附加到同一个实木复合地板并创建一个带有分区的实木复合地板。

我设法用 AvroParquetWriter 创建了一个 Parquet - 但我没有找到如何添加分区或附加到同一个文件:

在使用 Avro 之前,我使用 spark 编写 Parquet - 使用 spark 编写带分区的 parquet 并使用附加模式很简单 - 我应该尝试从我的 Avro 对象创建 Rdds 并使用 spark 来创建 parquet 吗?

4

1 回答 1

0

我想将 Parquets 写入 HDFS

就个人而言,我不会为此使用 Spark。

相反,我会使用HDFS Kafka Connector。这是一个可以帮助您入门的配置文件。

name=hdfs-sink
# List of topics to read
topics=test_hdfs

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# increase to be the sum of the partitions for all connected topics
tasks.max=1 

# the folder where core-site.xml and hdfs-site.xml exist
hadoop.conf.dir=/etc/hadoop
# the namenode url, defined as fs.defaultFS in the core-site.xml
hdfs.url=hdfs://hdfs-namenode.example.com:9000

# number of messages per file
flush.size=10 
# The format to write the message values
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

# Setup Avro parser
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD

如果您想要基于字段而不是文字“Kafka 分区”编号的 HDFS 分区,请参阅FieldPartitioner. 如果您想要自动 Hive 集成,请参阅相关文档。


假设您确实想使用 Spark,但是,您可以尝试AbsaOSS/ABRiS来读取 Avro DataFrame,然后您应该能够执行类似的操作df.write.format("parquet").path("/some/path")(不是确切的代码,因为我还没有尝试过)

于 2018-11-18T08:23:56.787 回答