0

我在维护一个大列表的地方有以下代码:我在这里所做的是检查数据流并创建一个倒排索引。我使用 twitter scalding api 并且 dataTypePipe 是 TypedPipe 的类型

  lazy val cats = dataTypePipe.cross(cmsCats)
  .map(vf => (vf._1.itemId, vf._1.leafCats, vf._2))
    .flatMap {
    case (id, categorySet, cHhitters) => categorySet.map(cat => (
    ...
  }
    .filter(f => f._2.nonEmpty)
    .group.withReducers(4000)
    .sum
    .map {
    case ((token,bucket), ids) =>
      toIndexedRecord(ids, token, bucket)
  }

由于序列化问题,我将 scala list 转换为 java list 并使用 avro 编写:

  def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = {
     val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
     new IndexRecord(token, bucket,javaList)
  }

但问题是列表中保存的大量信息会导致 Java Heap 问题。我相信求和也是这个问题的一个贡献者

2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
    at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
    at scala.collection.immutable.List.$plus$plus(List.scala:193)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
    at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
    at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)

所以我的问题是我能做些什么来避免这种情况。

4

2 回答 2

3

在 .sum 之前尝试 .forceToReducers。当我们正在缓存值时,这种 OOM 正在地图方面发生。这对你的情况可能没有帮助。

但是,如果列表确实太大,那么可以做的事情真的很少。

于 2014-02-14T20:44:47.180 回答
1

快速但不可扩展的答案:尝试增加mapred.child.java.opts

更好的答案,理解这个问题有点棘手,因为我不知道你的 val 的类型,我不知道是什么fvf因为你没有命名它们。如果您提供所需的最少代码量,以便我可以粘贴到 IDE 中并尝试一下,那么我可能会发现您的问题。

sum可能是 OOM 发生的地方,但这不是导致它的原因 - 重构以不同的方式进行求和将无济于事。

您可能会遇到太大而无法记忆的东西。因此mapred.child.java.opts,除非您完全重组数据,否则这可能是您唯一的解决方案。注意cross电话crossWithTiny,现在意味着:)

于 2013-12-01T13:42:39.810 回答