4

我有一个关于使用 pysparkSQL 与 delta 表合并 csv 文件的问题。我设法创建了 upsert 函数,如果匹配则更新,如果不匹配则插入。

我想将列添加ID到最终的增量表中,并在每次插入数据时增加它。此列标识我们的增量表中的每一行。有没有办法把它到位?

def Merge(dict1, dict2):
    res = {**dict1, **dict2}
    return res

def create_default_values_dict(correspondance_df,marketplace):
    dict_output = {}
    for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
        dict_output[field] = 'null'
        # We want to increment the id row each time we perform an insertion (TODO TODO TODO)
#         if field == 'id':
#             dict_output['id'] = col('id')+1
#         else:    
    return dict_output


def create_matched_update_dict(mapping, products_table, updates_table):
    output = {}
    for k,v in mapping.items():
        if k == 'source_name':
            output['products.source_name'] = lit(v)
        else:
            output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))     
    return output    

insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')

insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')

delta_table_products.alias('products').merge(
    updates_df_table.limit(20).alias('updates'),
    "products.barcode_ean == updates.ean") \
    .whenMatchedUpdate(set = update_values) \
    .whenNotMatchedInsert(values = insert_values)\
    .execute()

我试图增加id函数中的列,create_default_values_dict但它似乎不能正常工作,它不会自动增加 1。还有其他方法可以解决这个问题吗?提前致谢 :)

4

2 回答 2

1

Delta 不支持自增列类型。

一般来说,Spark 不使用自增 ID,而是倾向于单调递增的 ID。见functions.monotonically_increasing_id()

如果要实现自动增量行为,则必须使用多个 Delta 操作,例如,查询最大值 + 将其添加到row_number()通过窗口函数计算的列中 + 然后写入。这是有问题的,原因有两个:

  1. 除非您引入外部锁定机制或其他方式来确保在找到最大值和写入之间不会发生对表的更新,否则您最终可能会得到无效数据。

  2. 使用row_number()会将并行度降低到 1,强制所有数据通过单个内核,这对于大数据将非常慢。

最重要的是,您真的不想在 Spark 中使用自增列。

希望这可以帮助。

于 2020-01-21T07:33:26.443 回答
0

Databricks 具有托管 Spark 的 IDENTITY 列

https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html#parameters

也许这将很快开源。

GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
 [ ( [ START WITH start ] [ INCREMENT BY step ] ) ] 

这适用于 Delta 表。

于 2022-03-01T15:39:11.693 回答