2

我正在尝试开始使用DeltaLakesusing Pyspark

为了能够使用 deltalake,我在 Anaconda shell-prompt 上调用 pyspark 作为 —</p>

pyspark — packages io.delta:delta-core_2.11:0.3.0

这是来自 deltalake 的参考资料 — https://docs.delta.io/latest/quick-start.html

delta Lake 的所有命令在 Anaconda shell-prompt 中都能正常工作。

在 jupyter notebook 上,对 deltalake 表的引用会出错。这是我在 Jupyter Notebook 上运行的代码 -

df_advisorMetrics.write.mode("overwrite").format("delta").save("/DeltaLake/METRICS_F_DELTA")
spark.sql("create table METRICS_F_DELTA using delta location '/DeltaLake/METRICS_F_DELTA'")

下面是我在笔记本开始时用来连接到 pyspark 的代码 -

import findspark
findspark.init()
findspark.find()

import pyspark
findspark.find()

以下是我得到的错误:

Py4JJavaError:调用 o116.save 时出错。:java.lang.ClassNotFoundException:找不到数据源:delta。请在http://spark.apache.org/third-party-projects.html找到包

有什么建议么?

4

3 回答 3

3

我创建了一个 Google Colab/Jupyter Notebook 示例,展示了如何运行 Delta Lake。

https://github.com/prasannakumar2012/spark_experiments/blob/master/examples/Delta_Lake.ipynb

它具有运行所需的所有步骤。这使用最新的 spark 和 delta 版本。请相应地更改版本。

于 2020-07-29T01:42:52.767 回答
0

一个潜在的解决方案是使用常规 Jupyter notebook 遵循 Import PySpark 包中提到的技术。

另一个可能的解决方案是下载 delta-core JAR 并将其放在$SPARK_HOME/jars文件夹中,以便在运行jupyter notebook时自动包含 Delta Lake JAR。

于 2019-09-09T05:08:57.043 回答
-1

我一直在 Jupyter 笔记本上使用 DeltaLake。

在运行 Python 3.x 的 Jupyter 笔记本中尝试以下操作。

### import Spark libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

### spark package maven coordinates - in case you are loading more than just delta
spark_packages_list = [
    'io.delta:delta-core_2.11:0.6.1',
]
spark_packages = ",".join(spark_packages_list)

### SparkSession 
spark = (
    SparkSession.builder
    .config("spark.jars.packages", spark_packages)
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .getOrCreate()
)

sc = spark.sparkContext

### Python library in delta jar. 
### Must create sparkSession before import
from delta.tables import *

假设您有一个火花数据框df

高密度文件系统

节省

### overwrite, change mode="append" if you prefer
(df.write.format("delta")
.save("my_delta_file", mode="overwrite", partitionBy="partition_column_name")
)

加载

df_delta = spark.read.format("delta").load("my_delta_file")

AWS S3 对象存储

初始 S3 设置

### Spark S3 access
hdpConf = sc._jsc.hadoopConfiguration()
user = os.getenv("USER")

### Assuming you have your AWS credentials in a jceks keystore.
hdpConf.set("hadoop.security.credential.provider.path", f"jceks://hdfs/user/{user}/awskeyfile.jceks")

hdpConf.set("fs.s3a.fast.upload", "true")

### optimize s3 bucket-level parquet column selection
### un-comment to use
# hdpConf.set("fs.s3a.experimental.fadvise", "random")


### Pick one upload buffer option
hdpConf.set("fs.s3a.fast.upload.buffer", "bytebuffer") # JVM off-heap memory
# hdpConf.set("fs.s3a.fast.upload.buffer", "array") # JVM on-heap memory
# hdpConf.set("fs.s3a.fast.upload.buffer", "disk") # DEFAULT - directories listed in fs.s3a.buffer.dir

s3_bucket_path = "s3a://your-bucket-name"
s3_delta_prefix = "delta"  # or whatever

节省

### overwrite, change mode="append" if you prefer
(df.write.format("delta")
.save(f"{s3_bucket_path}/{s3_delta_prefix}/", mode="overwrite", partitionBy="partition_column_name")
)

加载

df_delta = spark.read.format("delta").load(f"{s3_bucket_path}/{s3_delta_prefix}/")

火花提交

不直接回答原始问题,但为了完整起见,您也可以执行以下操作。

将以下内容添加到您的spark-defaults.conf文件中

spark.jars.packages                 io.delta:delta-core_2.11:0.6.1
spark.delta.logStore.class          org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
spark.sql.extensions                io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog     org.apache.spark.sql.delta.catalog.DeltaCatalog

参考 spark-submit 命令中的 conf 文件

spark-submit \
--properties-file /path/to/your/spark-defaults.conf \
--name your_spark_delta_app \
--py-files /path/to/your/supporting_pyspark_files.zip \
--class Main /path/to/your/pyspark_script.py
于 2020-08-21T19:51:17.403 回答