2

我正在研究一种解决方案,该解决方案使用以下代码通过给定索引更新 delta 湖:

      dataframe = ks.read_table('data')
      subdataframe = dataframe .loc[dataframe ['status']== 1,:]


      for index,column in subdataframe.iterrows():
        
        #get values for a given row
        record= subdataframe.loc[index].to_dict()

        if(record_needs_updating):
          #update deltalake
          dataframe.loc[dataframe['file']==record['file'],'status'] = 0
          dataframe.to_delta('fileloc', partition_cols='pull',mode='overwrite')

          #update databricks table
          spark.sql("DROP TABLE  IF EXISTS data")
          spark.sql("CREATE TABLE data USING DELTA LOCATION fileloc)
          spark.sql("OPTIMIZE data")

我遇到的问题是尝试在 for 循环中索引子数据帧时出现的关键错误。

这似乎是因为数据框本身在更新 delta 湖后被更新为不包含任何 status = 0 的记录,这意味着索引发生了变化,从而给出了一个关键错误。

有什么方法可以将子数据帧变成一个非实时数据帧,随着 deltalake 的更新,该数据帧不会被更新?

还要注意我需要在代码运行时更新,而不是在所有代码运行后只更新一次。

谢谢!

4

0 回答 0