我正在使用appengine-mapreducecontrol.start_map()
库的简单功能来启动 mapreduce 作业。该作业成功完成并在结果页面上显示了 ~43M 映射器调用。但是,此页面没有提及 reduce 步骤或我认为仍在运行的任何底层 appengine-pipeline 流程。是否有某种方法可以返回此调用生成的管道 ID,以便我可以查看底层管道以帮助调试这个长期运行的作业?我想检索足够的信息来拉出这个页面: /mapreduce/detail?mapreduce_id=<my_id>
/mapreduce/pipeline/status?root=<guid>
这是我最初用于启动 mapreduce 作业的代码示例:
from third_party.mapreduce import control
mapreduce_id = control.start_map(
name="Backfill",
handler_spec="mark_tos_accepted",
reader_spec=(
"third_party.mapreduce.input_readers.DatastoreInputReader"),
mapper_parameters={
"input_reader": {
"entity_kind": "ModelX"
},
},
shard_count=64,
queue_name="backfill-mapreduce-queue",
)
这是映射函数:
# This is where we keep our copy of appengine-mapreduce
from third_party.mapreduce import operation as op
def mark_tos_accepted(modelx):
# Skip users who have already been marked
if (not modelx
or modelx.tos_accepted == myglobals.LAST_MATERIAL_CHANGE_TO_TOS):
return
modelx.tos_accepted = user_models.LAST_MATERIAL_CHANGE_TO_TOS
yield op.db.Put(modelx)
以下是 ModelX 的相关部分:
class BackupModel(db.Model):
backup_timestamp = db.DateTimeProperty(indexed=True, auto_now=True)
class ModelX(BackupModel):
tos_accepted = db.IntegerProperty(indexed=False, default=0)
有关更多上下文,我正在尝试调试我在数据仓库中出现的写入问题。
A
2013 年 3 月 23 日,我们在 db.Model(我们称它)上启动了一个 MapReduce 作业(我们称它为),它ModelX
有大约 4300 万个实体。7 小时后,作业“完成”,/mapreduce/detail
页面显示我们已成功映射所有实体,如下所示。
mapper-calls: 43613334 (1747.47/sec avg.)
2013 年 3 月 31 日,我们在ModelX
. 12 小时后,作业以 Success 状态完成, /mapreduce/detail
页面显示我们已成功映射所有实体,如下所示。
mapper-calls: 43803632 (964.24/sec avg.)
我知道 MR 作业 A 写入所有ModelX
实体,因为我们引入了一个新属性,以前没有任何实体包含。ModelX
包含一个像这样的 auto_add 属性。
backup_timestamp = ndb.DateTimeProperty(indexed=True, auto_now=True)
我们的数据仓库流程运行查询ModelX
以查找在某一天发生更改的那些实体,然后下载这些实体并将它们存储在单独的 (AWS) 数据库中,以便我们可以对它们进行分析。此查询的一个示例是:
db.GqlQuery('select * from ModelX where backup_timestamp >= DATETIME(2013, 4, 10, 0, 0, 0) and backup_timestamp < DATETIME(2013, 4, 11, 0, 0, 0) order by backup_timestamp')
我希望我们的数据仓库在 MR 作业完成的每一天都有大约 4300 万个实体,但实际上它更像是大约 300 万个实体,随后的每一天都显示出增加,如下面的进展所示:
3/16/13 230751
3/17/13 193316
3/18/13 344114
3/19/13 437790
3/20/13 443850
3/21/13 640560
3/22/13 612143
3/23/13 547817
3/24/13 2317784 // Why isn't this ~43M ?
3/25/13 3701792 // Why didn't this go down to ~500K again?
3/26/13 4166678
3/27/13 3513732
3/28/13 3652571
这让我觉得,尽管 mapreduce 作业发出的 op.db.Put() 调用仍在某些管道或队列中运行并导致这种涓流效应。
此外,如果我查询带有 old 的实体backup_timestamp
,我可以返回很远并且仍然获得大量实体,但我希望所有这些查询都返回 0:
In [4]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,2,23,1,1,1)').count()
Out[4]: 1000L
In [5]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,1,23,1,1,1)').count()
Out[5]: 1000L
In [6]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,23,1,1,1)').count()
Out[6]: 1000L
但是,在查询返回不应返回的实体时会出现这种奇怪的行为:
In [8]: old = ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,1,1,1,1)')
In [9]: paste
for o in old[1:100]:
print o.backup_timestamp
## -- End pasted text --
2013-03-22 22:56:03.877840
2013-03-22 22:56:18.149020
2013-03-22 22:56:19.288400
2013-03-22 22:56:31.412290
2013-03-22 22:58:37.710790
2013-03-22 22:59:14.144200
2013-03-22 22:59:41.396550
2013-03-22 22:59:46.482890
2013-03-22 22:59:46.703210
2013-03-22 22:59:57.525220
2013-03-22 23:00:03.864200
2013-03-22 23:00:18.040840
2013-03-22 23:00:39.636020
这让我觉得索引只是需要很长时间才能更新。
我还绘制了我们的数据仓库下载的实体数量,并注意到一些类似悬崖的下降,这让我认为在某个地方发生了一些幕后的节流,而我在任何暴露的诊断工具中都看不到appengine 仪表板。例如,这张图显示了 3/23 上的一个相当大的峰值,当我们开始 mapreduce 作业时,但此后不久急剧下降。
BackupTimestamp
GqlQuery
此图显示每天每 10 分钟间隔返回的实体计数。请注意,当 MapReduce 作业启动时,紫色线显示了一个巨大的峰值,然后在大约 1 小时后随着节流开始而急剧下降。该图还显示,似乎存在一些基于时间的节流。