我想将 Parquets 写入 HDFS
就个人而言,我不会为此使用 Spark。
相反,我会使用HDFS Kafka Connector。这是一个可以帮助您入门的配置文件。
name=hdfs-sink
# List of topics to read
topics=test_hdfs
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# increase to be the sum of the partitions for all connected topics
tasks.max=1
# the folder where core-site.xml and hdfs-site.xml exist
hadoop.conf.dir=/etc/hadoop
# the namenode url, defined as fs.defaultFS in the core-site.xml
hdfs.url=hdfs://hdfs-namenode.example.com:9000
# number of messages per file
flush.size=10
# The format to write the message values
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
# Setup Avro parser
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD
如果您想要基于字段而不是文字“Kafka 分区”编号的 HDFS 分区,请参阅FieldPartitioner
. 如果您想要自动 Hive 集成,请参阅相关文档。
假设您确实想使用 Spark,但是,您可以尝试AbsaOSS/ABRiS来读取 Avro DataFrame,然后您应该能够执行类似的操作df.write.format("parquet").path("/some/path")
(不是确切的代码,因为我还没有尝试过)