0

如何配置环境以从 Jupyter 笔记本向 Spark/YARN(客户端模式)提交 PyDeequ 作业。除了使用环境之外,没有全面的解释。如何设置环境以与非 AWS 环境一起使用?

TypeError: 'JavaPackage' object is not callable如果只是按照示例进行操作,则会导致错误,例如使用 PyDeequ 大规模测试数据质量

from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
      1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
      3     .onData(df) \
      4     .addAnalyzer(Size()) \
      5     .addAnalyzer(Completeness("review_id")) \

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
     50         """
     51         df = ensure_pyspark_df(self._spark_session, df)
---> 52         return AnalysisRunBuilder(self._spark_session, df)
     53 
     54 

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
    122         self._jspark_session = spark_session._jsparkSession
    123         self._df = df
--> 124         self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
    125 
    126     def addAnalyzer(self, analyzer: _AnalyzerObject):

TypeError: 'JavaPackage' object is not callable
4

1 回答 1

0

HADOOP_CONF_DIR

将 Hadoop/YARN 主节点的内容复制$HADOOP_HOME/etc/hadoop 到本地主机,并将HADOOP_CONF_DIR环境变量设置为指向该目录。

确保HADOOP_CONF_DIRYARN_CONF_DIR指向包含 Hadoop 集群的(客户端)配置文件的目录。这些配置用于写入 HDFS 并连接到 YARN ResourceManager。此目录中包含的配置将分发到 YARN 集群,以便应用程序使用的所有容器使用相同的配置。

os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"

蟒蛇路径

pyspark

需要能够加载pyspark python 模块。使用 pip 或 conda安装pyspark以安装 Spark 运行时库(用于独立)。或者从 Spark 安装中复制pyspark python 模块。$SPARK_HOME/python/lib

确保 SPARK_HOME 环境变量指向解压 tar 文件的目录。更新 PYTHONPATH 环境变量,使其可以在 SPARK_HOME/python/lib 下找到 PySpark 和 Py4J。执行此操作的一个示例如下所示:

export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
sys.path.extend([
    "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
    "/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])

PyDeequ

使用 pip 或conda安装pydeequ 。请注意,这不足以使用 pydeequ。

Deequ JAR 文件

Deequ jar 到库路径

要使用 PyDeequ,需要 deequ jar 文件。从Maven 存储库 com.amazon.deequ下载 Spark/Deequ 版本。

import os
import sys

root = os.path.dirname(os.path.realpath(os.getcwd()))
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"

!wget -q -O $classpath $deequ_jar

火花会话

将 Deequ jar 文件指定到 Spark jar 属性中,如下所示:

spark = SparkSession.builder\
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config("spark.driver.extraClassPath", classpath) \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
    .config('spark.debug.maxToStringFields', 100) \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()

Deequ 工作

使用亚马逊产品评论数据的摘录。 在此处输入图像描述

df = spark.read.csv(
    path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
    header=True,
)
df.printSchema()
-----
root
 |-- review_id: string (nullable = true)
 |-- marketplace: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- product_category: string (nullable = true)
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
-----
21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
+-------+---------------+-------------------+------+
| entity|       instance|               name| value|
+-------+---------------+-------------------+------+
| Column|      review_id|       Completeness|   1.0|
| Column|      review_id|ApproxCountDistinct|1040.0|
|Dataset|              *|               Size|1000.0|
| Column|top star_rating|         Compliance| 0.657|
+-------+---------------+-------------------+------+
于 2021-08-16T01:26:14.880 回答