我们正在尝试将 Hive 表中的增量发布到 Kafka。有问题的表是 244 MB 的单个分区、单个块文件。我们的集群配置为 256M 的块大小,因此在这种情况下,我们几乎是单个文件的最大值。
每次更新该表时,都会存档一个副本,然后我们运行我们的 delta 进程。
在下面的函数中,我们隔离了不同的连接,并确认内部连接的性能可以接受(大约 3 分钟),但两个反连接数据帧不会完成——我们不断向 Spark 作业投入更多资源,但仍在继续请参阅下面的错误。
这种连接的数据框大小是否有实际限制?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
我们看到的错误似乎表明驱动程序正在与执行者失去联系。我们将执行器内存增加到24G,网络超时时间高达900s,心跳间隔高达120s。
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
稍后在日志中:
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
我们一直在操作的配置开关(没有成功)是--executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s