1

我在代码逻辑中遇到了一个小问题。

我正在将使用 pandas 数据帧的代码行转换为使用 Koalas 数据帧,并且在代码执行期间出现以下错误。

# Error Message
PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

#Sample dataframe

# initialize data of lists.
data = {'team_code':['A1', 'S1'],
        'TeamName':['JohnTeam', 'SusanTeam']}
 
# Create DataFrame
input_df= pd.DataFrame(data)

使用 pandas 数据框的原始代码行如下所示:

# input_df is a pandas df here

to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])

dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
    

我使用数据框转换了上面的代码,如下所示。

使用 pandas 数据框的原始代码行如下所示。唯一的区别是 input_df 现在是一个考拉数据框:

# input_df is now a koalas df here

to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])

dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
    

编辑

使用 Spark 3.2.0 和 koalas==1.8.2 的数据块集群上的 Stacktrace

---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
<command-2399235872097642> in <module>
      2 to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])
      3 print(to_remove_df)
----> 4 dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
      5 dropped_df

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    193             start = time.perf_counter()
    194             try:
--> 195                 res = func(*args, **kwargs)
    196                 logger.log_success(
    197                     class_name, function_name, time.perf_counter() - start, signature

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    188         if hasattr(_local, "logging") and _local.logging:
    189             # no need to log since this should be internal call.
--> 190             return func(*args, **kwargs)
    191         _local.logging = True
    192         try:

/databricks/python/lib/python3.8/site-packages/databricks/koalas/base.py in isin(self, values)
   1234             )
   1235 
-> 1236         values = values.tolist() if isinstance(values, np.ndarray) else list(values)
   1237         return self._with_new_scol(self.spark.column.isin(values))
   1238 

/databricks/python/lib/python3.8/site-packages/databricks/koalas/indexes/base.py in __iter__(self)
   2479 
   2480     def __iter__(self):
-> 2481         return MissingPandasLikeIndex.__iter__(self)
   2482 
   2483     def __xor__(self, other):

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    248     def wrapper(*args, **kwargs):
    249         try:
--> 250             return func(*args, **kwargs)
    251         finally:
    252             logger.log_missing(class_name, function_name, is_deprecated, signature)

/databricks/python/lib/python3.8/site-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
     20 def unsupported_function(class_name, method_name, deprecated=False, reason=""):
     21     def unsupported_function(*args, **kwargs):
---> 22         raise PandasNotImplementedError(
     23             class_name=class_name, method_name=method_name, reason=reason
     24         )

PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

input_df当使用作为考拉数据框时,我无法弄清楚我在上面的代码中遗漏了什么。有人可以帮我解决吗?

谢谢你们!

4

1 回答 1

0

看起来您的过滤方法正在__iter__()幕后使用,目前考拉不支持。

我建议您定义一个自定义函数并将您的数据框传递给它的替代方法。这样,您应该获得与使用 pandas 代码相同的结果。函数的详细解释逐行编写。

def my_func(df):
  
  # be sure to create a column with unique identifiers
  df = df.reset_index(drop=True).reset_index()
  
  # create dataframe to be removed
  # the additional dummy column is needed to correctly filter out rows later on
  to_remove_df = df.drop_duplicates(['team_code', 'TeamName'])[['index']]
  to_remove_df = to_remove_df.assign(check = lambda x: 'remove_me')
  
  # merge the two dataframes and remove rows
  merged_df = df.merge(to_remove_df, on='index', how='outer')
  result = merged_df.loc[merged_df['check'].isna()]
  
  # drop unnecessary columns
  result = result.drop(columns=['index', 'check'])
  
  return result

示例 1

# your data
data = {'team_code':['A1', 'S1'],
        'TeamName':['JohnTeam', 'SusanTeam']}
input_df = ks.DataFrame(data)


df = my_func(input_df)
print(df)
# Empty DataFrame
# Columns: [team_code, TeamName]
# Index: []

示例 2

# other sample data
data = {'team_code':['A1', 'S1', 'A1', 'S1'],
        'TeamName':['JohnTeam', 'SusanTeam', 'RickTeam', 'SusanTeam']}
input_df = ks.DataFrame(data)


df = my_func(input_df)
print(df)
#   team_code   TeamName
# 3        S1  SusanTeam
于 2022-02-07T14:14:21.643 回答