我试图使用 py4j 在 pyspark 中运行 java 函数。Py4j 允许访问 JVM 中的 java 对象。我创建了另一个 JVM 实例,并且能够成功运行 java 函数。
py4j 通过 GatewayServer 实例启用此通信。
我想知道我们是否可以以某种方式访问 spark 的内部 JVM 来运行我的 java 函数?spark 中 py4j Gatewayserver 的入口点是什么?如何将我的函数添加到入口点?
我试图使用 py4j 在 pyspark 中运行 java 函数。Py4j 允许访问 JVM 中的 java 对象。我创建了另一个 JVM 实例,并且能够成功运行 java 函数。
py4j 通过 GatewayServer 实例启用此通信。
我想知道我们是否可以以某种方式访问 spark 的内部 JVM 来运行我的 java 函数?spark 中 py4j Gatewayserver 的入口点是什么?如何将我的函数添加到入口点?
我不确定这是否是您需要的,但我见过两个地方:
sc._gateway.jvm
可用于 java_import 或直接使用
sc._jvm
因此,要访问包 abc 中的类 X,您可以执行以下操作之一:
jvm = sc._gateway.jvm
java_import(jvm,"a.b.c.X")
instance = a.b.c.X()
或更直接地:
instance = sc._jvm.a.b.c.X()
要添加一个java 函数,您需要确保它在类路径中,如果您想在worker 中使用它(例如在UDF 中),那么您需要将它发送给worker。为此,您可以使用 --driver-class-path 开关来 spark-submit(或 pyspark)添加到驱动程序和 --jars 以发送给工作人员。
看着
$SPARK_HOME/python/pyspark/java_gateway.py
您将在那里看到用于与 Java/Scala 后端接口的机制。
您将需要更新一个或多个 Java 文件,如下所示:
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
这些代表Spark-Java
入口点。
Pyspark
使用Spark-Java
入口点而不是直接进入 Scala。您需要 - (a) 在这些 API 类中使用现有的或 - (b) 在这些类中添加新入口点并构建您自己的 Spark 版本