2

我正在尝试scipy.optimize.minimize对两列 pyspark 数据框使用函数。

在将x0参数作为数组传递给 Pandas UDF 函数时,我收到以下错误:

TypeError: Invalid argument, not a string or column: [0.9  0.5  2.5  5.   0.33] of type <class 'numpy.ndarray'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

这是我试图最小化的功能

def eb_func(theta, n, e):
    """
    # Function to be Minimized

    :param theta: float
    :param n: Pandas.Series
    :param e: Pandas.Series
    :return: float

    """
    print("Entering EB_Func")
    res = res = np.prod(theta[4] * neg_bin(n, e, theta[0], theta[1]) + (1 - theta[4]) * neg_bin(n, e, theta[2], theta[3]))
    return res

这是我的 neg_bin 函数:

@pandas_udf('double', PandasUDFType.SCALAR)
def neg_bin(n, e, alpha, beta):
    """

    :param n:
    :param e:
    :param alpha:
    :param beta:
    :return:
    """
    res_expo = gammaln(alpha + n) - gammaln(n + 1) - gammaln(alpha)
    res = np.exp(res_expo)
    res = res / (1 + beta / (e + 0.01)) ** n
    res = res / (1 + e / beta) ** alpha
    return res

这些是我的参数:

x0 = np.array([0.9, 0.5, 2.5, 5, 0.33])
bounds = ([0.000001, 200], [0.000001, 200], [0.000001, 200], [0.000001, 200], [0.000001, 1])

这是我试图调用scipy.optimize.minimize函数的地方。

# Define a function to call minimize function
def RunMinimize(data):
    Result = minimize(eb_func, x0, args=(data.Adolescent_a, data.Adolescent_e), method='L-BFGS-B', bounds=bounds, options={'disp': True, 'maxiter': 1000, 'eps': np.repeat(1e-4, 5)})
    return Result.x


RunMinimize(df_adol)

我是 PySpark 的新手,我可以在 Pandas 中执行此操作,但现在我有一个庞大的数据集,而 Pandas 需要花费大量时间来处理它。

以下是预期的输出格式:这是我在 Pandas 中得到的输出

[1.00000000e-06, 1.46304225e+00, 1.00000000e-06, 6.39066185e+00, 1.00000000e-06])

我无法将 theta 值传递给 neg_bin 函数。因为 neg_bin 函数只需要 pandas.Series 作为输入。如果可能的话,我正在寻找一种解决方法,将 theta 值作为标量连同 pandas.Series 作为输入发送到 neg_bin 函数。

任何帮助表示赞赏。TIA。

4

0 回答 0