2

我是新来的火花。我们有下面的配置单元查询,在它之上,我们正在使用 spark 和 python 执行枢轴操作。

下面的 pyspark 脚本执行一些枢轴操作并写入配置单元表。Hive 查询返回 1.4 亿行。

方法一

from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
tbl = hc.sql("""
    Select Rating.BranchID
    , Rating.Vehicle
    , Rating.PersonalAutoCov
    , Rating.PersonalVehicleCov 
    , Rating.EffectiveDate
    , Rating.ExpirationDate
    , attr.name as RatingAttributeName
    , Cast(Rating.OutputValue as Int) OutputValue
    , Rating.InputValue
    From db.dbo_pcx_paratingdata_piext_master rating
        Inner Join db.dbo_pctl_ratingattrname_piext_master attr 
            on rating.RatingAttribute = attr.id 
            and attr.CurrentRecordIndicator = 'Y'
    Where 
        rating.CurrentRecordIndicator = 'Y'
    """)
tbl.cache()
pvttbl1 = tbl.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
    .pivot("RatingAttributeName")\
    .agg({"InputValue":"max", "OutputValue":"sum"})

pvttbl1.createOrReplaceTempView("paRatingAttributes") 
hc.sql("Create table dev_pekindataaccesslayer.createcount as select * from paRatingAttributes") 

当我使用 spark-submit 命令运行上述脚本时,我最终得到

java.lang.OutOfMemoryError:Java 堆空间

或有时

java.lang.OutOfMemoryError:超出 GC 开销限制

我使用的 spark-submit 命令。

spark-submit spark_ex2.py --master yarn-cluster --num-executors 15 --executor-cores 50 --executor-memory 100g --driver-memory 100g, --conf `"spark.sql.shuffle.partitions=1000", --conf "spark.memory.offHeap.enabled=true", --conf "spark.memory.offHeap.size=100g",--conf "spark.network.timeout =1200", --conf "spark.executor.heartbeatInterval=1201"`

详细日志:

INFO MemoryStore: Memory use = 1480.9 KB (blocks) + 364.8 MB (scratch space shared across 40 tasks(s)) = 366.2 MB. 
Storage limit = 366.3 MB.
WARN BlockManager: Persisting block rdd_11_22 to disk instead.
WARN BlockManager: Putting block rdd_11_0 failed due to an exception
WARN BlockManager: Block rdd_11_0 could not be removed as it was not found on disk or in memory
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 10)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)

我对上面的 pyspark 脚本做了一些小改动,并且没有任何问题

方法二

    from pyspark import SparkContext
    from pyspark import HiveContext
    from pyspark.sql import SQLContext
    from pyspark.sql import functions as F
    sc = SparkContext()
    hc = HiveContext(sc)
    sqlContext = SQLContext(sc)
    tbl = hc.sql("""
       Select Rating.BranchID
          , Rating.Vehicle
         , Rating.PersonalAutoCov
            , Rating.PersonalVehicleCov
           , Rating.EffectiveDate
          , Rating.ExpirationDate
         , attr.name as RatingAttributeName
         , Cast(Rating.OutputValue as Int) OutputValue
         , Rating.InputValue
        From db.dbo_pcx_paratingdata_piext_master rating
           Inner Join db.dbo_pctl_ratingattrname_piext_master attr
              on rating.RatingAttribute = attr.id
             and attr.CurrentRecordIndicator = 'Y'
        Where
           rating.CurrentRecordIndicator = 'Y'
       """)
    tbl.createOrReplaceTempView("Ptable")
    r=sqlContext.sql("select count(1) from Ptable")
    m=r.collect()[0][0]
    hc.sql("drop table if exists db.Ptable")
    hc.sql("Create table db.Ptable as select * from Ptable")
    tb2 = hc.sql("select * from db.Ptable limit "+str(m))
    pvttbl1 = tb2.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
        .pivot("RatingAttributeName")\
        .agg({"InputValue":"max", "OutputValue":"sum"})

    pvttbl1.createOrReplaceTempView("paRatingAttributes")
    hc.sql("drop table if exists db.createcount")
    hc.sql("Create table db.createcount STORED AS ORC as select * from paRatingAttributes")

但是上面的脚本涉及到中间表的创建,这是一个额外的步骤。在方法 2 中,当我使用相同的 spark-submit 命令保留限制关键字时,它可以正常工作。

我的方法1有什么问题,我怎样才能使它起作用?

注意:我遵循了Spark java.lang.OutOfMemoryError: Java heap space并尝试了所有建议的 conf 参数,但仍然没有运气。

4

0 回答 0