3

我开始做amp-camp 5 练习。我尝试了以下两种情况:

情景#1

val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint
pagecounts.count

情景#2

val pagecounts = sc.textFile("data/pagecounts")
pagecounts.count

两种方案的 Spark shell 应用程序 UI 中显示的总时间不同。
场景 #1 耗时 0.5 秒,而场景 #2 仅耗时 0.2 秒

在场景 #1 中,检查点命令什么都不做,它既不是转换也不是动作。意思是,一旦 RDD 在某个动作之后具体化,就继续保存到磁盘。我在这里错过了什么吗?

问题:

  1. 我知道场景 #1 需要更多时间,因为 RDD 是检查点(写入磁盘)。有没有办法从总时间中知道检查点所用的时间?
    Spark shell 应用程序 UI 显示以下内容 - 调度程序延迟、任务反序列化时间、GC 时间、结果序列化时间、获取结果时间。但是,没有显示检查点的故障。

  2. 有没有办法访问上述指标,例如调度程序延迟、GC 时间并以编程方式保存它们?我想为 RDD 上调用的每个操作记录上述一些指标。

  3. 如何以编程方式访问以下信息:

    • RDD 的大小,当在检查点上持久化到磁盘时?
    • 目前内存中有多少百分比的 RDD?
    • 计算 RDD 所需的总时间?

如果您需要更多信息,请告诉我。

4

1 回答 1

2

Spark REST API几乎提供了您所要求的一切。

一些例子;

目前内存中有多少百分比的 RDD?

GET /api/v1/applications/[app-id]/storage/rdd/0

将回复:

{
  "id" : 0,
  "name" : "ParallelCollectionRDD",
  "numPartitions" : 2,
  "numCachedPartitions" : 2,
  "storageLevel" : "Memory Deserialized 1x Replicated",
  "memoryUsed" : 28000032,
  "diskUsed" : 0,
  "dataDistribution" : [ {
    "address" : "localhost:54984",
    "memoryUsed" : 28000032,
    "memoryRemaining" : 527755733,
    "diskUsed" : 0
  } ],
  "partitions" : [ {
    "blockName" : "rdd_0_0",
    "storageLevel" : "Memory Deserialized 1x Replicated",
    "memoryUsed" : 14000016,
    "diskUsed" : 0,
    "executors" : [ "localhost:54984" ]
  }, {
    "blockName" : "rdd_0_1",
    "storageLevel" : "Memory Deserialized 1x Replicated",
    "memoryUsed" : 14000016,
    "diskUsed" : 0,
    "executors" : [ "localhost:54984" ]
  } ]
}

计算 RDD 所需的总时间?

计算 RDD 也称为作业、阶段或尝试。 GET /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary

将回复:

{
  "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ],
  "executorDeserializeTime" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
  "executorRunTime" : [ 3.0, 3.0, 4.0, 4.0, 4.0 ],
  "resultSize" : [ 1457.0, 1457.0, 1457.0, 1457.0, 1457.0 ],
  "jvmGcTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
  "resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
  "memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
  "diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
  "shuffleReadMetrics" : {
    "readBytes" : [ 340.0, 340.0, 342.0, 342.0, 342.0 ],
    "readRecords" : [ 10.0, 10.0, 10.0, 10.0, 10.0 ],
    "remoteBlocksFetched" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
    "localBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
    "fetchWaitTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
    "remoteBytesRead" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
    "totalBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ]
  }
}

你的问题太笼统了,我就不一一回复了。我相信 spark 必须反映的一切都反映在 REST API 中。

于 2015-10-06T07:25:48.337 回答