0

有人在 Pyspark 环境中使用过 Apache Hudi 吗?如果可能,是否有可用的代码示例?

4

1 回答 1

4

这是具有 INSERT、UPDATE 和 READ 操作的工作 pyspark 示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = (
    SparkSession.builder.appName("Hudi_Data_Processing_Framework")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .config(
        "spark.jars.packages",
        "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
    )
    .getOrCreate()
)

input_df = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ("id", "creation_date", "last_update_time"),
)

hudi_options = {
    # ---------------DATA SOURCE WRITE CONFIGS---------------#
    "hoodie.table.name": "hudi_test",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "last_update_time",
    "hoodie.datasource.write.partitionpath.field": "creation_date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": 1,
    "hoodie.insert.shuffle.parallelism": 1,
    "hoodie.consistency.check.enabled": True,
    "hoodie.index.type": "BLOOM",
    "hoodie.index.bloom.num_entries": 60000,
    "hoodie.index.bloom.fpp": 0.000000001,
    "hoodie.cleaner.commits.retained": 2,
}

# INSERT
(
    input_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save("/tmp/hudi_test")
)

#UPDATE
update_df = input_df.limit(1).withColumn("last_update_time", lit("2016-01-01T13:51:39.340396Z"))
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save("/tmp/hudi_test")
)

# READ
output_df = spark.read.format("org.apache.hudi").load(
    "/tmp/hudi_test/*/*"
)

output_df.show()
于 2021-03-22T13:16:10.847 回答