0

我是 Apache hudi 的新手。我正在编写一个脚本来获取 s3 中从 aws catelog 到 apache hudi 的所有表。这是我尝试执行的自定义脚本。它没有显示错误,但不知何故列和缺失目标文件夹。

   import sys
   from datetime import datetime
   from pyspark.context import SparkContext
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql.session import SparkSession
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from awsglue.context import GlueContext
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.job import Job

    # @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'exetype'])


    # for Hudi - use this specific pattern
    spark = SparkSession.builder.config('spark.serializer', 
  'org.apache.spark.serializer.KryoSerializer')\
    .config('spark.sql.hive.convertMetastoreParquet', 'false')\
    .getOrCreate()
    sc = spark.sparkContext
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init('data', args)
    exetype = args['exetype']

    table_list = [('table1', 'primary key'),
              ('table2','primary key'),
              ('table3','primary key'),
              ('table4','primary key')]

    for table, pk in table_list:
    dsource0 = glueContext.create_dynamic_frame.from_catalog(
        database="postgres",
        table_name=table,
        transformation_ctx=table,
    )

    # Hudi Config section
        commonConfig = {
        'className': 'org.apache.hudi',
        'hoodie.datasource.hive_sync.use_jdbc': 'false',
        'hoodie.datasource.write.precombine.field': pk,
        'hoodie.datasource.write.recordkey.field': pk,
        'hoodie.table.name': table,
        'hoodie.datasource.hive_sync.database': 'postgres',
        'hoodie.datasource.hive_sync.table': table,
        'hoodie.datasource.hive_sync.enable': 'false',
        'hoodie.consistency.check.enabled': 'false',
        'hoodie.parquet.max.file.size': 125829120,
        "hoodie.index.type": "BLOOM",
         "hoodie.index.bloom.num_entries": 60000,
         "hoodie.index.bloom.fpp": 0.000000001,
         "hoodie.cleaner.commits.retained": 10,
         'path': f's3://transformations/data/hudi/{table}/'
          }
               initLoadConfig = {
              'hoodie.bulkinsert.shuffle.parallelism': 68,
              'hoodie.datasource.write.operation': 'bulk_insert'
           }
        upsertConfig = {
        'hoodie.upsert.shuffle.parallelism': 68,
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
        'hoodie.cleaner.commits.retained': 10
    }
    deleteConfig = {
        'hoodie.upsert.shuffle.parallelism': 68,
        'hoodie.datasource.write.operation': 'delete',
        'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
        'hoodie.cleaner.commits.retained': 10
       }
       unpartitionDataConfig = {
        'hoodie.datasource.hive_sync.partition_extractor_class': 
      'org.apache.hudi.hive.NonPartitionedExtractor',
        'hoodie.datasource.write.keygenerator.class': 
       'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
        }
        combinedConfUnPart = {**commonConfig, **
                          unpartitionDataConfig, **initLoadConfig}
         combinedConfDeleteUnPart = {**commonConfig,
                                **unpartitionDataConfig, **deleteConfig}
        combinedConfUpsertUnpart = {**commonConfig,
                                **unpartitionDataConfig, **upsertConfig}
        insert = dsource0.toDF()
        delete = dsource0.toDF()

        if dsource0.count() > 0:
            if (exetype == 'bulk'):
                glueContext.write_dynamic_frame.from_options(
                    frame=dsource0, connection_type="marketplace.spark", 
    connection_options=combinedConfUnPart)
            else:
                glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(
                   insert, glueContext, "insert"), connection_type="marketplace.spark", 
       connection_options=combinedConfUpsertUnpart)
                glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(
                    delete, glueContext, "delete"), connection_type="marketplace.spark", 
        connection_options=combinedConfDeleteUnPart)

     job.commit()

Tha 代码存储在 Hudi 中,但是当我尝试比较源列和目标列时。目标中的列不相似,其中一些列丢失了。你能帮我看看我做错了什么吗?有什么问题吗配置文件或代码问题。我有一个问题,如果我有多个主键并且有多个表要移动,你能帮我解决这个问题吗?

非常感谢你

4

0 回答 0