0

我正在尝试在并行化代码中使用 Pandas“应用”,但“应用”根本不起作用。我们可以在使用 Spark(在 RDD 上并行化)时在分发给执行程序的代码中使用“应用”吗?

代码:

def testApply(k):
    return pd.DataFrame({'col1':k,'col2':[k*2]*5})

def testExec(x):
    df=pd.DataFrame({'col1':range(0,10)})
    ddf=pd.DataFrame(columns=['col1', 'col2'])
    ##In my case the below line doesn't get executed at all
    res= df.apply(lambda row: testApply(row.pblkGroup) if row.pblkGroup%2==0 else pd.DataFrame(), axis=1)

list1=[1,2,3,4]
sc=SparkContext.getOrCreate()
testRdd= sc.parallelize(list1)
output=testRdd.map(lambda x: testExec(x)).collect()



4

3 回答 3

0

看起来版本低于 0.21 的 Pandas 不支持此功能。我已经升级了 Pandas 版本,它工作正常。

于 2019-10-28T20:10:18.880 回答
0

我也遇到了同样的错误(TypeError: an integer is required (got type bytes)

from pyspark.context import SparkContext

TypeError                                 Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_11288/3937779276.py in <module>
----> 1 from pyspark.context import SparkContext

~\miniconda3\lib\site-packages\pyspark\__init__.py in <module>
     49 
     50 from pyspark.conf import SparkConf
---> 51 from pyspark.context import SparkContext
     52 from pyspark.rdd import RDD, RDDBarrier
     53 from pyspark.files import SparkFiles

~\miniconda3\lib\site-packages\pyspark\context.py in <module>
     29 from py4j.protocol import Py4JError
     30 
---> 31 from pyspark import accumulators
     32 from pyspark.accumulators import Accumulator
     33 from pyspark.broadcast import Broadcast, BroadcastPickleRegistry

~\miniconda3\lib\site-packages\pyspark\accumulators.py in <module>
     95     import socketserver as SocketServer
     96 import threading
---> 97 from pyspark.serializers import read_int, PickleSerializer
     98 
     99 

~\miniconda3\lib\site-packages\pyspark\serializers.py in <module>
     69     xrange = range
     70 
---> 71 from pyspark import cloudpickle
     72 from pyspark.util import _exception_message
     73 

~\miniconda3\lib\site-packages\pyspark\cloudpickle.py in <module>
    143 
    144 
--> 145 _cell_set_template_code = _make_cell_set_template_code()
    146 
    147 

~\miniconda3\lib\site-packages\pyspark\cloudpickle.py in _make_cell_set_template_code()
    124         )
    125     else:
--> 126         return types.CodeType(
    127             co.co_argcount,
    128             co.co_kwonlyargcount,

TypeError: an integer is required (got type bytes

)

于 2021-12-08T08:25:10.733 回答
0

要在 Spark 中使用 Pandas,您有 2 个选项:-

使用闭包

关于 Spark 的难点之一是在跨集群执行代码时了解变量和方法的范围和生命周期。修改其范围之外的变量的 RDD 操作可能是一个常见的混淆源。在下面的示例中,我们将查看使用 foreach() 来增加计数器的代码,但类似的问题也可能发生在其他操作中。

更多细节可以在这里找到[1]

例子

import numpy as np
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()

B = [2,0,1,0] 
V = [5,1,2,4]

def V_sum(row,b,c):
    return float(np.sum(c[row==b]))

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())    
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))

详细信息可以在这里找到[2]

使用 Pandas UDF

在 Spark 2.4.4 中,可以将 Pandas 与 Spark 一起使用。可以在此处找到详细信息以及示例 [3]

1 - http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures- 2 - pyspark 数据框上的自定义函数 3 - https://spark.apache.org/docs/latest /sql-pyspark-pandas-with-arrow.html

于 2019-10-25T03:18:43.473 回答