3

我无法将系列作为新列分配给考拉数据框。下面是我正在使用的代码库:

from databricks import koalas

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))

输出:

AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
  11661             return self._to_internal_pandas().to_string()
  11662 
> 11663         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11664         pdf_length = len(pdf)
  11665         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    343             method = get_real_method(obj, self.print_method)
    344             if method is not None:
--> 345                 return method()
    346             return None
    347         else:

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
  11684             return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
  11685 
> 11686         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11687         pdf_length = len(pdf)
  11688         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

您能帮我了解我的方法出了什么问题以及如何将新列分配给考拉数据集吗?

4

2 回答 2

1

老实说,我不知道您为什么会收到该错误assign,但是向 a 添加新列的一种koalas.DataFrame方法是使用如下的标准分配方法['']
更改选项compute.ops_on_diff_frames以允许对不同系列/数据帧进行操作非常重要。

import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)

dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])

dft
#    a  b  c
# 0  1  3  1
# 1  2  4  2
# 2  3  5  3
于 2021-12-15T10:20:25.633 回答
1

不幸的是,您只能在assign方法中对数据框的现有列使用表达式。

解释

错误堆栈中的重要部分是 spark 执行计划:

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

在 spark 执行计划中,Project可以翻译为 SQL 的SELECT. 并且您可以看到执行计划在第二个失败(您从下到上阅读 spark 执行计划),因为它在数据框中 存在的列中Project找不到列0#991184L(即您要添加到数据框中的系列) ,, ,dftdft__index_level_0__#991164La#991165Lb#991166L__natural_order__#991170L

实际上,列0#991184L来自您突然创建的系列,而不是源自您的数据框的系列dft。对于 Spark,这意味着该列来自另一个数据帧,因此您显然无法使用 . 从dft数据帧中检索它SELECT,这正是 Spark 试图做的。

assign要链接 pandas 和 Spark API,Spark 等效于withColumnSpark 数据帧方法,其文档指出:

列表达式必须是此 DataFrame 上的表达式;尝试从其他 DataFrame 添加列将引发错误。

注意:实际上,Spark 等价于更多assign功能selectwithColumn仅限于添加一列,但限制withColumn也适用于select

因此assign适用于以下情况:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))

但不适用于以下情况:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})

dft.assign(c=koalas.Series([1,2,3]))

dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])

解决方法

这里的解决方法是按照ric-s 的回答和分配列中的说明进行操作dft['c'] = koalas.Series([1,2,3])

在这里它起作用了,因为在这种情况下,Spark 将连接两个数据帧,而不是仅仅从第一个数据帧中选择列。作为连接,这里被 koalas API 隐藏,在 Spark 中可能是非常昂贵的操作,你有一个护栏,你需要通过设置compute.ops_on_diff_frames来覆盖它True

设置compute.ops_on_diff_frames为 true 只是告诉考拉“我承认这个操作是一个连接,可能会导致性能不佳”。您实际上可以在执行操作后将此选项重置为其以前的值,使用koalas.reset_option('compute.ops_on_diff_frames')

于 2021-12-16T23:54:10.503 回答