4

我编写了以下 MyPythonGateway.java 以便可以从 Python 调用我的自定义 java 类:

public class MyPythonGateway {

    public String findMyNum(String input) {
        return MyUtiltity.parse(input).getMyNum(); 
    }

    public static void main(String[] args) {
        GatewayServer server = new GatewayServer(new MyPythonGateway());
        server.start();
    }
}

这是我在 Python 代码中使用它的方式:

def main():

    gateway = JavaGateway()                   # connect to the JVM
    myObj = gateway.entry_point.findMyNum("1234 GOOD DAY")
    print(myObj)


if __name__ == '__main__':
    main()

现在我想使用MyPythonGateway.findMyNum()PySpark 中的函数,而不仅仅是一个独立的 python 脚本。我做了以下事情:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)

但是,我收到以下错误:

... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
  File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.

那么我在这里错过了什么?我不知道在使用 pyspark 时是否应该运行 MyPythonGateway 的单独 JavaApplication 来启动网关服务器。请指教。谢谢!


下面正是我需要的:

input.map(f)

def f(row):
   // call MyUtility.java 
   // x = MyUtility.parse(row).getMyNum()
   // return x

解决这个问题的最佳方法是什么?谢谢!

4

4 回答 4

3

首先,您看到的错误通常意味着您尝试使用的类不可访问。所以很可能这是一个CLASSPATH问题。

关于总体思路,有两个重要问题:

  • 您无法SparkContext在操作或转换中访问,因此使用 PySpark 网关将不起作用(有关详细信息,请参阅如何从操作或转换中使用 Java/Scala 函数?)。如果你想从 worker 中使用 Py4J,你必须在每台 worker 机器上启动一个单独的网关。
  • 你真的不想以这种方式在 Python 和 JVM 之间传递数据。Py4J 不是为数据密集型任务而设计的。
于 2016-02-28T21:47:11.160 回答
2

在开始调用方法之前在 PySpark 中 -

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")

您必须按如下方式导入 MyPythonGateway java 类

java_import(sparkContext._jvm, "myPackage.MyPythonGateway")
myPythonGateway  = spark.sparkContext._jvm.MyPythonGateway()
myPythonGateway.findMyNum("1234 GOOD DAY")

在spark-submit中使用--jars选项指定包含 myPackage.MyPythonGateway 的 jar

于 2018-08-08T21:02:53.210 回答
1

例如,如果input.map(f)将输入作为 RDD,这可能会起作用,因为您无法在执行程序中访问 JVM 变量(附加到 spark 上下文)以获取 RDD 的映射函数(据我所知,@transient lazy val在 pyspark中没有等效项)。

def pythonGatewayIterator(iterator):
    results = []
    jvm = py4j.java_gateway.JavaGateway().jvm
    mygw = jvm.myPackage.MyPythonGateway()
    for value in iterator:
        results.append(mygw.findMyNum(value))
    return results


inputs.mapPartitions(pythonGatewayIterator)
于 2019-09-10T18:09:47.363 回答
0

您需要做的就是编译 jar 并使用 --jars 或 --driver-class-path spark 提交选项添加到 pyspark 类路径。然后使用以下代码访问类和方法-

sc._jvm.com.company.MyClass.func1()

其中 sc - 火花上下文

使用 Spark 2.3 测试。请记住,您只能从驱动程序而不是执行程序调用 JVM 类方法。

于 2020-02-03T08:23:17.993 回答