1

Spark version-2.3.2
EMR - 5.19.0
8 Executors
每个执行器 - 5 core

我正在尝试做的事情:-
1. s3 中有数百万个 numpy 功能(.npy 文件)。
2. 使用 spark 下载所有 numpy 特征向量。
3. 将它们转换为 Spark Dataframes,其中 Dataframe 架构是特征名称(字符串)和特征向量(VectorUDT)。
4.然后在数据帧之间使用spark找到LSH。

代码:-

import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import SparkSession
from pyspark import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.ml.linalg import VectorUDT

spark = SparkSession \
        .builder \
        .appName('test-app') \
        .getOrCreate()

    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    features_rdd = sc. \
        binaryFiles("s3://input_path/"). \
        map(
        lambda x: (
            x[0].split("/")[-1].split(".")[0],
            Vectors.dense(np.fromstring(x[1], dtype='<f4'))
        )
    )

    FEATURES_RAW_SCHEMA = StructType([StructField("image_id", StringType(), True),
                                      StructField("feature", VectorUDT(), True)
                                      ])

    features_df = sqlContext.createDataFrame(features_rdd, FEATURES_RAW_SCHEMA)
    brp = BucketedRandomProjectionLSH(inputCol="feature", outputCol="hashes", seed=12345, numHashTables=30,
                                      bucketLength=1000)




    model = brp.fit(features_df)

    all_pairs_df = model.approxSimilarityJoin(features_df, features_df, 50.0, distCol="similarity_score").select(
        F.col("datasetA.image_id").alias("image_id1"), F.col("datasetB.image_id").alias("image_id2"),
        F.col("similarity_score"))



    all_pairs_df.write.mode("overwrite").json(
        "s3a://output")

错误:-

19/01/10 11:20:11 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 11, ip-ip1.ec2.internal, executor 5): java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:133)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:111)
    at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:156)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

提交脚本:-

sudo -i spark-submit --verbose --deploy-mode cluster --num-executors 11 --executor-cores 5 --executor-memory 8G --driver-cores 5 --driver-memory 8G --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.executor.memoryOverhead=2048 /home/hadoop/filename.py

在某些情况下,执行程序因 SIGTERM 信号而死,我可以在日志文件中看到。我在这里做错了什么?

4

0 回答 0