我正在研究一种解决方案,该解决方案使用以下代码通过给定索引更新 delta 湖:
dataframe = ks.read_table('data')
subdataframe = dataframe .loc[dataframe ['status']== 1,:]
for index,column in subdataframe.iterrows():
#get values for a given row
record= subdataframe.loc[index].to_dict()
if(record_needs_updating):
#update deltalake
dataframe.loc[dataframe['file']==record['file'],'status'] = 0
dataframe.to_delta('fileloc', partition_cols='pull',mode='overwrite')
#update databricks table
spark.sql("DROP TABLE IF EXISTS data")
spark.sql("CREATE TABLE data USING DELTA LOCATION fileloc)
spark.sql("OPTIMIZE data")
我遇到的问题是尝试在 for 循环中索引子数据帧时出现的关键错误。
这似乎是因为数据框本身在更新 delta 湖后被更新为不包含任何 status = 0 的记录,这意味着索引发生了变化,从而给出了一个关键错误。
有什么方法可以将子数据帧变成一个非实时数据帧,随着 deltalake 的更新,该数据帧不会被更新?
还要注意我需要在代码运行时更新,而不是在所有代码运行后只更新一次。
谢谢!