我是 Apache hudi 的新手。我正在编写一个脚本来获取 s3 中从 aws catelog 到 apache hudi 的所有表。这是我尝试执行的自定义脚本。它没有显示错误,但不知何故列和缺失目标文件夹。
import sys
from datetime import datetime
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'exetype'])
# for Hudi - use this specific pattern
spark = SparkSession.builder.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer')\
.config('spark.sql.hive.convertMetastoreParquet', 'false')\
.getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init('data', args)
exetype = args['exetype']
table_list = [('table1', 'primary key'),
('table2','primary key'),
('table3','primary key'),
('table4','primary key')]
for table, pk in table_list:
dsource0 = glueContext.create_dynamic_frame.from_catalog(
database="postgres",
table_name=table,
transformation_ctx=table,
)
# Hudi Config section
commonConfig = {
'className': 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': pk,
'hoodie.datasource.write.recordkey.field': pk,
'hoodie.table.name': table,
'hoodie.datasource.hive_sync.database': 'postgres',
'hoodie.datasource.hive_sync.table': table,
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.consistency.check.enabled': 'false',
'hoodie.parquet.max.file.size': 125829120,
"hoodie.index.type": "BLOOM",
"hoodie.index.bloom.num_entries": 60000,
"hoodie.index.bloom.fpp": 0.000000001,
"hoodie.cleaner.commits.retained": 10,
'path': f's3://transformations/data/hudi/{table}/'
}
initLoadConfig = {
'hoodie.bulkinsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'bulk_insert'
}
upsertConfig = {
'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
deleteConfig = {
'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'delete',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
unpartitionDataConfig = {
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
combinedConfUnPart = {**commonConfig, **
unpartitionDataConfig, **initLoadConfig}
combinedConfDeleteUnPart = {**commonConfig,
**unpartitionDataConfig, **deleteConfig}
combinedConfUpsertUnpart = {**commonConfig,
**unpartitionDataConfig, **upsertConfig}
insert = dsource0.toDF()
delete = dsource0.toDF()
if dsource0.count() > 0:
if (exetype == 'bulk'):
glueContext.write_dynamic_frame.from_options(
frame=dsource0, connection_type="marketplace.spark",
connection_options=combinedConfUnPart)
else:
glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(
insert, glueContext, "insert"), connection_type="marketplace.spark",
connection_options=combinedConfUpsertUnpart)
glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(
delete, glueContext, "delete"), connection_type="marketplace.spark",
connection_options=combinedConfDeleteUnPart)
job.commit()
Tha 代码存储在 Hudi 中,但是当我尝试比较源列和目标列时。目标中的列不相似,其中一些列丢失了。你能帮我看看我做错了什么吗?有什么问题吗配置文件或代码问题。我有一个问题,如果我有多个主键并且有多个表要移动,你能帮我解决这个问题吗?
非常感谢你