0

我们正在尝试将 Hive 表中的增量发布到 Kafka。有问题的表是 244 MB 的单个分区、单个块文件。我们的集群配置为 256M 的块大小,因此在这种情况下,我们几乎是单个文件的最大值。

每次更新该表时,都会存档一个副本,然后我们运行我们的 delta 进程。

在下面的函数中,我们隔离了不同的连接,并确认内部连接的性能可以接受(大约 3 分钟),但两个反连接数据帧不会完成——我们不断向 Spark 作业投入更多资源,但仍在继续请参阅下面的错误。

这种连接的数据框大小是否有实际限制?

  private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
    extends BasePublisher(spark, sink, source) with Serializable {

    val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object

    def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {

      val current = spark.read.parquet(dataLocation)
      val previous = spark.read.parquet(archiveLocation)

      val inserts = current.join(previous, keys, "leftanti")
      val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
      val deletes = previous.join(current, keys, "leftanti")

      val upsertCounter = spark.sparkContext.longAccumulator("upserts")
      val deleteCounter = spark.sparkContext.longAccumulator("deletes")

      logInfo("sending inserts to kafka")
      sink.sendDeltasToKafka(inserts, "U", upsertCounter)

      logInfo("sending updates to kafka")
      sink.sendDeltasToKafka(updates, "U", upsertCounter)

      logInfo("sending deletes to kafka")
      sink.sendDeltasToKafka(deletes, "D", deleteCounter)

      (upsertCounter.value, deleteCounter.value)
    }
  }

我们看到的错误似乎表明驱动程序正在与执行者失去联系。我们将执行器内存增加到24G,网络超时时间高达900s,心跳间隔高达120s。

17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
    at ...

稍后在日志中:

17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
    at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.

我们一直在操作的配置开关(没有成功)是--executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s

4

1 回答 1

0

我没有考虑的选项是增加我的驱动程序资源。我添加--driver-memory 4G--driver-cores 2看到我的工作在大约 9 分钟内完成。

这两个文件的内部连接(或使用内置except()方法)似乎会给执行程序带来内存压力。对其中一个关键列进行分区似乎有助于缓解内存压力,但会增加总体时间,因为涉及更多的改组。

在这两个文件之间做左反连接需要我们有更多的驱动资源。没想到。

于 2017-11-30T17:08:36.077 回答