Spark version-2.3.2
EMR - 5.19.0
8 Executors
每个执行器 - 5 core
我正在尝试做的事情:-
1. s3 中有数百万个 numpy 功能(.npy 文件)。
2. 使用 spark 下载所有 numpy 特征向量。
3. 将它们转换为 Spark Dataframes,其中 Dataframe 架构是特征名称(字符串)和特征向量(VectorUDT)。
4.然后在数据帧之间使用spark找到LSH。
代码:-
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import SparkSession
from pyspark import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.ml.linalg import VectorUDT
spark = SparkSession \
.builder \
.appName('test-app') \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
features_rdd = sc. \
binaryFiles("s3://input_path/"). \
map(
lambda x: (
x[0].split("/")[-1].split(".")[0],
Vectors.dense(np.fromstring(x[1], dtype='<f4'))
)
)
FEATURES_RAW_SCHEMA = StructType([StructField("image_id", StringType(), True),
StructField("feature", VectorUDT(), True)
])
features_df = sqlContext.createDataFrame(features_rdd, FEATURES_RAW_SCHEMA)
brp = BucketedRandomProjectionLSH(inputCol="feature", outputCol="hashes", seed=12345, numHashTables=30,
bucketLength=1000)
model = brp.fit(features_df)
all_pairs_df = model.approxSimilarityJoin(features_df, features_df, 50.0, distCol="similarity_score").select(
F.col("datasetA.image_id").alias("image_id1"), F.col("datasetB.image_id").alias("image_id2"),
F.col("similarity_score"))
all_pairs_df.write.mode("overwrite").json(
"s3a://output")
错误:-
19/01/10 11:20:11 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 11, ip-ip1.ec2.internal, executor 5): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:133)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:111)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:156)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
提交脚本:-
sudo -i spark-submit --verbose --deploy-mode cluster --num-executors 11 --executor-cores 5 --executor-memory 8G --driver-cores 5 --driver-memory 8G --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.executor.memoryOverhead=2048 /home/hadoop/filename.py
在某些情况下,执行程序因 SIGTERM 信号而死,我可以在日志文件中看到。我在这里做错了什么?