2

我正在使用appengine-mapreducecontrol.start_map()库的简单功能来启动 mapreduce 作业。该作业成功完成并在结果页面上显示了 ~43M 映射器调用。但是,此页面没有提及 reduce 步骤或我认为仍在运行的任何底层 appengine-pipeline 流程。是否有某种方法可以返回此调用生成的管道 ID,以便我可以查看底层管道以帮助调试这个长期运行的作业?我想检索足够的信息来拉出这个页面: /mapreduce/detail?mapreduce_id=<my_id>/mapreduce/pipeline/status?root=<guid>

这是我最初用于启动 mapreduce 作业的代码示例:

from third_party.mapreduce import control
mapreduce_id = control.start_map(
    name="Backfill",
    handler_spec="mark_tos_accepted",
    reader_spec=(
        "third_party.mapreduce.input_readers.DatastoreInputReader"),
    mapper_parameters={
        "input_reader": {
            "entity_kind": "ModelX"
        },
    },
    shard_count=64,
    queue_name="backfill-mapreduce-queue",
 )

这是映射函数:

# This is where we keep our copy of appengine-mapreduce
from third_party.mapreduce import operation as op

def mark_tos_accepted(modelx):
    # Skip users who have already been marked
    if (not modelx
        or modelx.tos_accepted == myglobals.LAST_MATERIAL_CHANGE_TO_TOS):
    return

    modelx.tos_accepted = user_models.LAST_MATERIAL_CHANGE_TO_TOS
    yield op.db.Put(modelx)

以下是 ModelX 的相关部分:

class BackupModel(db.Model):
    backup_timestamp = db.DateTimeProperty(indexed=True, auto_now=True)

class ModelX(BackupModel):
    tos_accepted = db.IntegerProperty(indexed=False, default=0)

有关更多上下文,我正在尝试调试我在数据仓库中出现的写入问题。

A2013 年 3 月 23 日,我们在 db.Model(我们称它)上启动了一个 MapReduce 作业(我们称它为),它ModelX有大约 4300 万个实体。7 小时后,作业“完成”,/mapreduce/detail页面显示我们已成功映射所有实体,如下所示。

mapper-calls: 43613334 (1747.47/sec avg.)

2013 年 3 月 31 日,我们在ModelX. 12 小时后,作业以 Success 状态完成, /mapreduce/detail页面显示我们已成功映射所有实体,如下所示。

mapper-calls: 43803632 (964.24/sec avg.)

我知道 MR 作业 A 写入所有ModelX实体,因为我们引入了一个新属性,以前没有任何实体包含。ModelX包含一个像这样的 auto_add 属性。

backup_timestamp = ndb.DateTimeProperty(indexed=True, auto_now=True)

我们的数据仓库流程运行查询ModelX以查找在某一天发生更改的那些实体,然后下载这些实体并将它们存储在单独的 (AWS) 数据库中,以便我们可以对它们进行分析。此查询的一个示例是:

db.GqlQuery('select * from ModelX where backup_timestamp >= DATETIME(2013, 4, 10, 0, 0, 0) and backup_timestamp < DATETIME(2013, 4, 11, 0, 0, 0) order by backup_timestamp')

我希望我们的数据仓库在 MR 作业完成的每一天都有大约 4300 万个实体,但实际上它更像是大约 300 万个实体,随后的每一天都显示出增加,如下面的进展所示:

3/16/13 230751
3/17/13 193316
3/18/13 344114
3/19/13 437790
3/20/13 443850
3/21/13 640560
3/22/13 612143
3/23/13 547817
3/24/13 2317784  // Why isn't this ~43M ?
3/25/13 3701792  // Why didn't this go down to ~500K again?
3/26/13 4166678
3/27/13 3513732
3/28/13 3652571

这让我觉得,尽管 mapreduce 作业发出的 op.db.Put() 调用仍在某些管道或队列中运行并导致这种涓流效应。

此外,如果我查询带有 old 的实体backup_timestamp,我可以返回很远并且仍然获得大量实体,但我希望所有这些查询都返回 0:

In [4]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,2,23,1,1,1)').count()
Out[4]: 1000L

In [5]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,1,23,1,1,1)').count()
Out[5]: 1000L

In [6]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,23,1,1,1)').count()
Out[6]: 1000L

但是,在查询返回不应返回的实体时会出现这种奇怪的行为:

In [8]: old = ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,1,1,1,1)')

In [9]: paste
for o in old[1:100]:
  print o.backup_timestamp
## -- End pasted text --
2013-03-22 22:56:03.877840
2013-03-22 22:56:18.149020
2013-03-22 22:56:19.288400
2013-03-22 22:56:31.412290
2013-03-22 22:58:37.710790
2013-03-22 22:59:14.144200
2013-03-22 22:59:41.396550
2013-03-22 22:59:46.482890
2013-03-22 22:59:46.703210
2013-03-22 22:59:57.525220
2013-03-22 23:00:03.864200
2013-03-22 23:00:18.040840
2013-03-22 23:00:39.636020

这让我觉得索引只是需要很长时间才能更新。

我还绘制了我们的数据仓库下载的实体数量,并注意到一些类似悬崖的下降,这让我认为在某个地方发生了一些幕后的节流,而我在任何暴露的诊断工具中都看不到appengine 仪表板。例如,这张图显示了 3/23 上的一个相当大的峰值,当我们开始 mapreduce 作业时,但此后不久急剧下降。

BackupTimestamp GqlQuery此图显示每天每 10 分钟间隔返回的实体计数。请注意,当 MapReduce 作业启动时,紫色线显示了一个巨大的峰值,然后在大约 1 小时后随着节流开始而急剧下降。该图还显示,似乎存在一些基于时间的节流。

每天每 10 分钟增量的数据存储读取次数

4

3 回答 3

2

I don't think you'll have any reducer functions there, because all you've done is start a mapper. To do a complete mapreduce, you have to explicitly instantiate a MapReducePipeline and call start on it. As a bonus, that answers your question, as it returns the pipeline ID which you can then use in the status URL.

于 2013-05-10T21:23:57.993 回答
0

control.start_map 不使用管道,也没有 shuffle/reduce 步骤。当 mapreduce 状态页面显示完成时,所有与 mapreduce 相关的任务队列任务应该已经完成​​。您可以检查您的队列,甚至暂停它。

我怀疑旧模型的旧索引或最终一致性存在问题。要调试 MR,过滤警告/错误日志并按 mr id 搜索很有用。为了帮助您解决特定情况,查看您的 Map 处理程序可能会很有用。

于 2013-05-21T23:55:17.930 回答
0

只是想了解具体问题。您是否期望 AWS 数据库中有更多的实体?我怀疑问题出在将旧 ModelX 实体下载到 AWS 数据库的过程中,它以某种方式无法捕获所有更新的实体。

AWS 下载过程是否以任何方式修改 ModelX?如果不是,那您为什么会惊讶于发现具有旧modified时间戳的实体?modified只会在写入时更新,而不是在读取操作时更新。

有点不相关——关于节流,我通常发现一个节流的任务队列是问题所在,所以也许检查一下你的任务有多长时间,或者你的应用程序是否由于在其他地方发生的大量错误而被节流。

于 2013-05-21T18:09:31.810 回答