1

I am running Spark in standalone mode on 2 machines which have these configs

  1. 500gb memory, 4 cores, 7.5 RAM
  2. 250gb memory, 8 cores, 15 RAM

I have created a master and a slave on 8core machine, giving 7 cores to worker. I have created another slave on 4core machine with 3 worker cores. The UI shows 13.7 and 6.5 G usable RAM for 8core and 4core respectively.

Now on this I have to process an aggregate of user ratings over a period of 15 days. I am trying to do this using Pyspark This data is stored in hourwise files in day-wise directories in an s3 bucket, every file must be around 100MB eg

s3://some_bucket/2015-04/2015-04-09/data_files_hour1

I am reading the files like this

a = sc.textFile(files, 15).coalesce(7*sc.defaultParallelism) #to restrict partitions

where files is a string of this form 's3://some_bucket/2015-04/2015-04-09/*,s3://some_bucket/2015-04/2015-04-09/*'

Then I do a series of maps and filters and persist the result

a.persist(StorageLevel.MEMORY_ONLY_SER)

Then I need to do a reduceByKey to get an aggregate score over the span of days.

b = a.reduceByKey(lambda x, y: x+y).map(aggregate)
b.persist(StorageLevel.MEMORY_ONLY_SER)

Then I need to make a redis call for the actual terms for the items the user has rated, so I call mapPartitions like this

final_scores = b.mapPartitions(get_tags)

get_tags function creates a redis connection each time of invocation and calls redis and yield a (user, item, rate) tuple (The redis hash is stored in the 4core)

I have tweaked the settings for SparkConf to be at

conf = (SparkConf().setAppName(APP_NAME).setMaster(master)
        .set("spark.executor.memory", "5g")
        .set("spark.akka.timeout", "10000")
        .set("spark.akka.frameSize", "1000")
        .set("spark.task.cpus", "5")
        .set("spark.cores.max", "10")
        .set("spark.serializer",      "org.apache.spark.serializer.KryoSerializer")
        .set("spark.kryoserializer.buffer.max.mb", "10")
        .set("spark.shuffle.consolidateFiles", "True")
        .set("spark.files.fetchTimeout", "500")
        .set("spark.task.maxFailures", "5"))

I run the job with driver-memory of 2g in client mode, since cluster mode doesn't seem to be supported here. The above process takes a long time for 2 days' of data (around 2.5hours) and completely gives up on 14 days'.

What needs to improve here?

  1. Is this infrastructure insufficient in terms of RAM and cores (This is offline and can take hours, but it has got to finish in 5 hours or so)
  2. Should I increase/decrease the number of partitions?
  3. Redis could be slowing the system, but the number of keys is just too huge to make a one time call.
  4. I am not sure where the task is failing, in reading the files or in reducing.
  5. Should I not use Python given better Spark APIs in Scala, will that help with efficiency as well?

This is the exception trace

Lost task 4.1 in stage 0.0 (TID 11, <node>): java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
    at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
    at sun.security.ssl.InputRecord.read(InputRecord.java:509)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:164)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:227)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
    at org.apache.http.util.EntityUtils.consume(EntityUtils.java:88)
    at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.releaseConnection(HttpMethodReleaseInputStream.java:102)
    at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.close(HttpMethodReleaseInputStream.java:194)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.seek(NativeS3FileSystem.java:152)
    at org.apache.hadoop.fs.BufferedFSInputStream.seek(BufferedFSInputStream.java:89)
    at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:63)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:126)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:236)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

I could really use some help, thanks in advance

Here is what my main code looks like

def main(sc): f=get_files() a=sc.textFile(f, 15) .coalesce(7*sc.defaultParallelism) .map(lambda line: line.split(",")) .filter(len(line)>0) .map(lambda line: (line[18], line[2], line[13], line[15])).map(scoring) .map(lambda line: ((line[0], line[1]), line[2])).persist(StorageLevel.MEMORY_ONLY_SER) b=a.reduceByKey(lambda x, y: x+y).map(aggregate) b.persist(StorageLevel.MEMORY_ONLY_SER) c=taggings.mapPartitions(get_tags) c.saveAsTextFile("f") a.unpersist() b.unpersist()

The get_tags function is

def get_tags(partition):
 rh = redis.Redis(host=settings['REDIS_HOST'], port=settings['REDIS_PORT'], db=0)
 for element in partition:
    user = element[0]
    song = element[1]
    rating = element[2]
    tags = rh.hget(settings['REDIS_HASH'], song)
    if tags:
        tags = json.loads(tags)
    else:
        tags = scrape(song, rh)
    if tags:
        for tag in tags:
            yield (user, tag, rating)

The get_files function is as:

def get_files():
 paths = get_path_from_dates(DAYS)
 base_path = 's3n://acc_key:sec_key@bucket/'
 files = list()
 for path in paths:
    fle = base_path+path+'/file_format.*'
    files.append(fle)
 return ','.join(files)

The get_path_from_dates(DAYS) is

def get_path_from_dates(last):
 days = list()
 t = 0
 while t <= last:
    d = today - timedelta(days=t)
    path = d.strftime('%Y-%m')+'/'+d.strftime('%Y-%m-%d')
    days.append(path)
    t += 1
 return days
4

2 回答 2

0

作为一个小的优化,我创建了两个单独的任务,一个从 s3 读取并获得加和,第二个从 redis 读取转换。第一个任务有很多分区,因为要读取大约 2300 个文件。第二个分区的数量要少得多,以防止 redis 连接延迟,并且只有一个文件要读取,它位于 EC2 集群本身上。这只是部分的,仍在寻找改进的建议......

于 2015-04-10T09:02:09.630 回答
0

我在一个类似的用例中:在coalesce具有 300,000 多个分区的 RDD 上进行操作。不同之处在于我使用的是 s3a( SocketTimeoutExceptionfrom S3AFileSystem.waitAysncCopy)。最后通过设置更大的fs.s3a.connection.timeout(Hadoop's core-site.xml)解决了这个问题。希望你能得到一个线索。

于 2015-07-27T06:37:58.003 回答