2

我可以使用 Jupyter 实验室连接到远程托管的数据块 Spark 集群吗?

有关于 databricks 连接的知识库文章,它允许 scala 或 java 客户端进程控制 spark 集群。这是一个示例:
https ://docs.databricks.com/dev-tools/databricks-connect.html

虽然那篇知识库文章涵盖了很多场景,但它没有解释如何使用 Jupyter 笔记本与使用Scala编程语言的数据块集群进行交互。我熟悉 scala 编程,但不熟悉 Python。

4

3 回答 3

3

是的,这似乎是可能的,尽管它没有得到很好的记录。这些步骤在 Windows 上对我有用。我在 scala 2.12.10 中使用了 databricks v.7.1。

步骤 1。安装 anaconda:https ://repo.anaconda.com/

步骤 2。因为 python 似乎是笔记本的首选语言,所以
您需要手动安装和配置一个 scala 内核
,我可以使用 almond 内核:https
://almond.sh/ 当您安装 almond 时,请仔细选择
与您将在远程集群中连接的 DBR 运行时相对应的 scala 版本。

步骤 3。现在按照 databricks-connect 文档获取一个 scala 程序来
编译并通过 intellij / sbt 环境连接到远程集群。
文档可以在这里找到。 https://docs.databricks.com/dev-tools/databricks-connect.html
这是一种完全受支持且相当传统的方法,可用于开发自定义模块。

第 4 步。一旦你创建了一个工作的 scala 进程,你就会熟悉 sbt。build.sbt 用于引用“databricks-connect”分布。分布将在这样的位置:

unmanagedBase := new java.io.File("C:\\Users\\minime\\AppData\\Local\\Programs\\Python\\Python37\\Lib\\site-packages\\pyspark\\jars")

虽然 intellij / sbt 将这些依赖项编译到您的程序中很简单,但在 almond/jupyter 内核中做同样的事情需要更多的工作。

在你回到你的 jupyter notebook 之前,运行你的新 scala 进程并允许它创建一个 spark 会话。然后在进程终止之前,使用“进程资源管理器”找到相关的java.exe,然后在下部视图/窗格中显示句柄,然后将所有句柄复制到记事本中(进程资源管理器中的Ctrl+A,记事本中的Ctrl+V )。这为您提供了 databricks 分发中的模块子集,这些模块在运行时实际加载到您的进程中。

步骤 5。现在您已经有了相关的模块,您需要配置您的 almond scala 内核以将它们加载到内存中。创建一个新的 jupyter notebook 并选择 scala 内核并使用如下代码加载所有模块:

interp.load.cp(ammonite.ops.Path(java.nio.file.FileSystems.getDefault().getPath( "C:/Users/minime/AppData/Local/Programs/Python/Python37/Lib/site-packages/pyspark/jars/whatever001-1.1.1.jar")))
interp.load.cp(ammonite.ops.Path(java.nio.file.FileSystems.getDefault().getPath( "C:/Users/minime/AppData/Local/Programs/Python/Python37/Lib/site-packages/pyspark/jars/whatever002-1.1.1.jar")))
interp.load.cp(ammonite.ops.Path(java.nio.file.FileSystems.getDefault().getPath( "C:/Users/minime/AppData/Local/Programs/Python/Python37/Lib/site-packages/pyspark/jars/whatever003-1.1.1.jar")))
...

请注意,发行版中有很多很多的罐子(可能有 100 个!?)。

您可能希望直接从 maven 加载其他库(假设它们与 scala 2.12.10 和您的 databricks-connect 发行版兼容)

// Microsoft JDBC
 interp.load.ivy("com.microsoft.sqlserver" % "mssql-jdbc" % "8.2.1.jre8")


// Other libraries
 interp.load.ivy("joda-time" % "joda-time" % "2.10.5")
 interp.load.ivy("org.scalaj" %% "scalaj-http" % "2.3.0")
 interp.load.ivy("org.json4s" %% "json4s-native" % "3.5.3")
 interp.load.ivy("com.microsoft.azure"  % "msal4j"   % "1.6.1")


// Other libraries
interp.load.ivy("org.apache.hadoop" % "hadoop-azure" % "3.2.1")

公平的警告......当将库加载到杏仁内核中时,有时以特定顺序加载它们很重要。我上面的示例并不是要告诉您通过 interp.load 加载它们的顺序。

第 6 步。如果一切按计划进行,您现在应该能够使用类似于您在上面“第 3 步”中编写的内容的代码创建在 jupyter 笔记本中运行的 spark 会话。

import org.apache.spark.sql._
val p_SparkSession = SparkSession.builder()
        .appName("APP_" + java.util.UUID.randomUUID().toString)
        .master("local") 
        .config("spark.cores.max","4") 
        .getOrCreate()

您的杏仁内核现在已通过 databricks-connect 分发连接到远程集群。只要您不需要将任何函数或数据类型序列化到远程集群,一切都可以正常工作。在这种情况下,您可能会遇到各种序列化错误和空指针异常。这是一个例子:

java.lang.NullPointerException com.databricks.service.SparkServiceClassSync$.checkSynced(SparkServiceClassSync.scala:244) org.apache.spark.sql.util.SparkServiceObjectOutputStream.writeReplaceClassDescriptor(SparkServiceObjectOutputStream.scala:82) ... org.apache.spark .sql.util.ProtoSerializer.serializePlan(ProtoSerializer.scala:377) com.databricks.service.SparkServiceRPCClientStub.$anonfun$executePlan$1(SparkServiceRPCClientStub.scala:193)

这个答案将是几个中的第一个。我希望有其他 scala/spark/databricks 专家可以帮助解决此配置中的剩余问题,以便远程集群也可以使用我的笔记本中声明的任何函数和数据类型!

于 2020-09-04T00:41:07.027 回答
0

在我的第一个答案中,我指出使用 scala 笔记本(在 Jupyter 实验室中使用杏仁)的主要挑战是我们缺少序列化任何函数或数据类型并将它们发送到由托管的远程集群的功能数据块。

我应该指出,当我遇到此限制时,我经常使用两种解决方法。

  • 我恢复使用"spark-shell"。它是 databricks-connect 发行版的标准组件。然后我可以使用 :load 和 :paste 命令加载我的 scala 代码的相关部分。出于某种令人高兴的原因,“spark-shell”完全能够序列化函数和数据类型,以便动态地将它们发送到远程集群。这是杏仁核在 Jupyter notebooks 的上下文中无法为我们做的事情。

  • 另一种解决方法是将.collect()数据帧返回到驱动程序(在 jupyter notebook 内核的内存中)。一旦它们被收集,我可以对它们执行额外的转换,即使借助“原始”函数和“原始”数据类型,仅在我的 jupyter 笔记本中找到。在这种情况下,我不会获得分布式处理的性能优势。但是,虽然代码仍在开发中,但我通常不会使用非常大的数据集,因此如果驱动程序正在运行我的功能,或者如果工作人员正在运行,它并没有太大的区别。

希望这很清楚。我希望 Databricks 最终会看到允许 scala 程序员在 jupyter 实验室中远程开发代码的好处。我认为他们需要选择其中一个 scala 内核,并进行繁重的工作以支持这种情况。到目前为止,他们可能认为自己在自己的门户中的笔记本体验足以满足所有 scala 程序员的需求。

于 2020-09-07T18:50:46.927 回答
0

为了补充大卫的第一个答案,我做了这个额外的步骤:

步骤 5.5。以编程方式将 databricks jar 依赖项添加到 scala 内核。

使用从我获得的目录,databricks-connect get-jar-dir我使用了以下代码:

import $ivy.`com.lihaoyi::os-lib:0.2.7`

def importJars{

    val myJars = os.list(os.Path("/Users/me/miniconda3/envs/dbx-p40/lib/python3.7/site-packages/pyspark/jars/"))

    for (j <- myJars){
        interp.load.cp(ammonite.ops.Path(java.nio.file.FileSystems.getDefault().getPath(j.toString)))
    }
}

importJars
于 2021-09-20T01:55:10.950 回答