2

我在 scala 中有一个 spark Job,它从云存储中读取,执行一些操作并写入大查询。它在我的本地(主本地 [*] 部署模式客户端)上运行良好,但在 dataproc 上,它分配执行器,似乎超过了计划阶段并在尝试执行作业时挂起。

代码:

val groupedEventProductRDD = productCategorizationRDD
      .join(eventProductPairRDD, numPartitions = 16)
      .map(x => ((x._2._2, x._2._1),1)) // Key, Value = ((event_id, categorization), 1)
      .reduceByKey(_ + _) // Sum for all (event_id, categorization)
      .map(x => (x._1._1, (x._1._2, x._2))) // Key, Value = (event_id, (categorization, count))
      .groupByKey()
      .map(x => categorizeEvent(x._1, x._2))

时间线: 在此处输入图像描述

日志:

16/04/11 23:15:26 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.4.4-hadoop2
16/04/11 23:15:26 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-3.c.my-projectid-345.internal:36839
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl: Received new token for : spark-node-w-2.c.my-projectid-345.internal:32909
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl: Received new token for : spark-node-w-4.c.my-projectid-345.internal:46672
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-2.c.my-projectid-345.internal:32909
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-4.c.my-projectid-345.internal:46672
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl: Received new token for : spark-node-w-0.c.my-projectid-345.internal:34557
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl: Received new token for : spark-node-w-1.c.my-projectid-345.internal:55874
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-1.c.my-projectid-345.internal:55874
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-0.c.my-projectid-345.internal:34557
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:27 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-3.c.my-projectid-345.internal:36839
16/04/11 23:15:29 INFO com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration: Using specified project-id 'my-projectid-123' for input
16/04/11 23:15:29 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:29 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
16/04/11 23:15:29 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-0.c.my-projectid-345.internal:34557
16/04/11 23:15:29 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : spark-node-w-2.c.my-projectid-345.internal:32909
16/04/11 23:15:29 WARN org.apache.spark.rdd.NewHadoopRDD: Caching NewHadoopRDDs as deserialized objects usually leads to undesired behavior because Hadoop's RecordReader reuses the same Writable object for all records. Use a map transformation to make copies of the records.
16/04/11 23:15:29 INFO com.company.division.subjectarea.job.processing.MyProcessingJob$: Join columns are category,subcategory,department,tier1,tier2
16/04/11 23:15:30 INFO com.company.division.subjectarea.job.processing.MyProcessingJob$: Reloading daily_customer_fact.product_category
16/04/11 23:15:30 INFO com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration: Using specified project-id 'my-projectid-123' for output
16/04/11 23:15:30 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Bigquery connector version 0.7.4-hadoop2
16/04/11 23:15:30 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
16/04/11 23:15:30 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from given credential.
16/04/11 23:16:30 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:30 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:30 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:30 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:31 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:31 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:31 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.
16/04/11 23:16:32 WARN org.apache.spark.deploy.yarn.YarnAllocator: Expected to find pending requests, but found none.

主线:

org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330)
com.company.division.subjectarea.job.processing.MyProcessingJob$.main(MyProcessingJob.scala:77)
com.company.division.subjectarea.job.processing.MyProcessingJob.main(MyProcessingJob.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
4

0 回答 0