3

我试图使用 py4j 在 pyspark 中运行 java 函数。Py4j 允许访问 JVM 中的 java 对象。我创建了另一个 JVM 实例,并且能够成功运行 java 函数。

py4j 通过 GatewayServer 实例启用此通信。

我想知道我们是否可以以某种方式访问​​ spark 的内部 JVM 来运行我的 java 函数?spark 中 py4j Gatewayserver 的入口点是什么?如何将我的函数添加到入口点?

4

2 回答 2

4

我不确定这是否是您需要的,但我见过两个地方:

sc._gateway.jvm

可用于 java_import 或直接使用

sc._jvm

因此,要访问包 abc 中的类 X,您可以执行以下操作之一:

jvm = sc._gateway.jvm
java_import(jvm,"a.b.c.X")
instance = a.b.c.X()

或更直接地:

instance = sc._jvm.a.b.c.X()

要添加一个java 函数,您需要确保它在类路径中,如果您想在worker 中使用它(例如在UDF 中),那么您需要将它发送给worker。为此,您可以使用 --driver-class-path 开关来 spark-submit(或 pyspark)添加到驱动程序和 --jars 以发送给工作人员。

于 2016-03-10T11:28:07.567 回答
1

看着

$SPARK_HOME/python/pyspark/java_gateway.py

您将在那里看到用于与 Java/Scala 后端接口的机制。

您将需要更新一个或多个 Java 文件,如下所示:

java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")

这些代表Spark-Java入口点。

Pyspark使用Spark-Java入口点而不是直接进入 Scala。您需要 - (a) 在这些 API 类中使用现有的或 - (b) 在这些类中添加新入口点并构建您自己的 Spark 版本

于 2016-03-03T18:03:38.197 回答