19

我正在尝试使用 SparkSession 将文件的 JSON 数据转换为带有 Spark Notebook 的 RDD。我已经有了 JSON 文件。

 val spark = SparkSession
   .builder()
   .appName("jsonReaderApp")
   .config("config.key.here", configValueHere)
   .enableHiveSupport()
   .getOrCreate()
val jread = spark.read.json("search-results1.json")

我对 spark 很陌生,不知道用什么来config.key.hereconfigValueHere.

4

5 回答 5

45

火花会话

要获取 SparkSession 的所有“作为键值对的各种 Spark 参数”,“使用 Dataset 和 DataFrame API 编程 Spark 的入口点”,运行以下命令(这是使用 Spark Python API,Scala 将非常相似) .

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SparkConf().getAll()

或不导入SparkConf

spark.sparkContext.getConf().getAll()

根据您使用的 API,请参阅以下内容之一:

  1. https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html
  2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html
  3. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/SparkSession.html

您可以通过运行以下代码获得更深层次的 SparkSession 配置选项列表。大多数是相同的,但还有一些额外的。我不确定你是否可以改变这些。

spark.sparkContext._conf.getAll()  

火花上下文

要获取 SparkContext 的所有“作为键值对的各种 Spark 参数”、“Spark 功能的主要入口点”、...“连接到 Spark 集群”、...以及“创建 RDD、累加器和在该集群上广播变量,”运行以下命令。

import pyspark
from pyspark import SparkConf, SparkContext 
spark_conf = SparkConf().setAppName("test")
spark = SparkContext(conf = spark_conf)
SparkConf().getAll()

根据您使用的 API,请参阅以下内容之一:

  1. https://spark.apache.org/docs/latest/api/scala/org/apache/spark/SparkContext.html
  2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html
  3. https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html

火花参数

您应该获得一个包含“作为键值对的各种 Spark 参数”的元组列表,类似于以下内容:

[(u'spark.eventLog.enabled', u'true'),
 (u'spark.yarn.appMasterEnv.PYSPARK_PYTHON', u'/<yourpath>/parcels/Anaconda-4.2.0/bin/python'),
 ...
 ...
 (u'spark.yarn.jars', u'local:/<yourpath>/lib/spark2/jars/*')]

根据您使用的 API,请参阅以下内容之一:

  1. https://spark.apache.org/docs/latest/api/scala/org/apache/spark/SparkConf.html
  2. https://spark.apache.org/docs/latest//api/python/reference/api/pyspark.SparkConf.html
  3. https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkConf.html

有关 Spark 属性的完整列表,请参阅:
http ://spark.apache.org/docs/latest/configuration.html#viewing-spark-properties

设置 Spark 参数

("spark.some.config.option", "some-value")您可以在应用程序中设置每个元组:

火花会话

spark = (
    SparkSession
    .builder
    .appName("Your App Name")
    .config("spark.some.config.option1", "some-value")
    .config("spark.some.config.option2", "some-value")
    .getOrCreate())

sc = spark.sparkContext

火花上下文

spark_conf = (
    SparkConf()
    .setAppName("Your App Name")
    .set("spark.some.config.option1", "some-value")
    .set("spark.some.config.option2", "some-value"))

sc = SparkContext(conf = spark_conf)

火花默认值

您还可以在spark-defaults.conf文件中设置 Spark 参数:

spark.some.config.option1 some-value
spark.some.config.option2 "some-value"

spark-submit然后使用(pyspark)运行您的 Spark 应用程序:

spark-submit \
--properties-file path/to/your/spark-defaults.conf \
--name "Your App Name" \
--py-files path/to/your/supporting/pyspark_files.zip \
--class Main path/to/your/pyspark_main.py
于 2017-11-13T23:24:38.230 回答
7

这就是我在我的 scala 中添加 spark 或 hive 设置的方式:

{
    val spark = SparkSession
        .builder()
        .appName("StructStreaming")
        .master("yarn")
        .config("hive.merge.mapfiles", "false")
        .config("hive.merge.tezfiles", "false")
        .config("parquet.enable.summary-metadata", "false")
        .config("spark.sql.parquet.mergeSchema","false")
        .config("hive.merge.smallfiles.avgsize", "160000000")
        .enableHiveSupport()
        .config("hive.exec.dynamic.partition", "true")
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .config("spark.sql.orc.impl", "native")
        .config("spark.sql.parquet.binaryAsString","true")
        .config("spark.sql.parquet.writeLegacyFormat","true")
        //.config(“spark.sql.streaming.checkpointLocation”, “hdfs://pp/apps/hive/warehouse/dev01_landing_initial_area.db”)
        .getOrCreate()
}
于 2019-05-10T17:36:15.263 回答
3

设置一些配置的最简单方法:

spark.conf.set("spark.sql.shuffle.partitions", 500).

wherespark指的是 a SparkSession,这样您就可以在运行时设置配置。当您想要一次又一次地更改配置以调整特定查询的一些 spark 参数时,它非常有用。

于 2021-01-15T19:57:24.997 回答
1

简单来说,“config”方法中设置的值会自动传播到 SparkConf 和 SparkSession 自己的配置中。

例如:您可以参考 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-settings.html以了解如何使用配置选项为 SparkSession 设置 hive 仓库位置

要了解此 api,您可以参考:https ://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/SparkSession.Builder.html

于 2017-03-26T04:52:15.483 回答
0

每个 Spark 配置选项都在以下位置进行了说明: http ://spark.apache.org/docs/latest/configuration.html

您可以在运行时设置这些,如上面的示例或通过给 spark-submit 的配置文件

于 2017-03-27T17:26:12.533 回答