5

我们正在尝试在我们的项目中大量使用MapReduce 。现在我们有这个问题,有很多'InternalError:内部错误'。日志中的错误...

它的一个例子:

    "POST /mapreduce/worker_callback HTTP/1.1" 500 0 "http://appname/mapreduce/worker_callback" "AppEngine-Google; 
(+http://code.google.com/appengine)" "appname.appspot.com" ms=18856 cpu_ms=15980 
queue_name=default task_name=appengine-mrshard-15828822618486744D69C-11-195 
instance=00c61b117c47e0cba49bc5e5c7f9d328693e95ce
W 2012-10-24 06:51:27.140
suspended generator _put_tasklet(context.py:274) raised InternalError(internal error.)
W 2012-10-24 06:51:27.153
suspended generator put(context.py:703) raised InternalError(internal error.)
E 2012-10-24 06:51:27.207
internal error.
Traceback (most recent call last):
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1511, in __call__
    rv = self.handle_exception(request, response, e)
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1505, in __call__
    rv = self.router.dispatch(request, response)
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1253, in default_dispatcher
    return route.handler_adapter(request, response)
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1077, in __call__
    return handler.dispatch()
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 547, in dispatch
    return self.handle_exception(e, self.app.debug)
  File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch
    return method(*args, **kwargs)
  File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/base_handler.py", line 65, in post
    self.handle()
  File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/handlers.py", line 208, in handle
    ctx.flush()
  File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 333, in flush
    pool.flush()
  File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 221, in flush
    self.__flush_ndb_puts()
  File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 239, in __flush_ndb_puts
    ndb.put_multi(self.ndb_puts.items, config=self.__create_config())
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/model.py", line 3650, in put_multi
    for future in put_multi_async(entities, **ctx_options)]
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 325, in get_result
    self.check_success()
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 368, in _help_tasklet_along
    value = gen.throw(exc.__class__, exc, tb)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/context.py", line 703, in put
    key = yield self._put_batcher.add(entity, options)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 368, in _help_tasklet_along
    value = gen.throw(exc.__class__, exc, tb)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/context.py", line 274, in _put_tasklet
    keys = yield self._conn.async_put(options, datastore_entities)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 454, in _on_rpc_completion
    result = rpc.get_result()
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 834, in get_result
    result = rpc.get_result()
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 604, in get_result
    return self.__get_result_hook(self)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1569, in __put_hook
    self.check_rpc_success(rpc)
  File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1224, in check_rpc_success
    raise _ToDatastoreError(err)
InternalError: internal error.

队列.yaml:

queue:
- name: default
  rate: 500/s
  bucket_size: 100
  max_concurrent_requests: 400
  retry_parameters:
    min_backoff_seconds: 5
    max_backoff_seconds: 120
    max_doublings: 2

MapReduce 映射器参数:

'shard_count': 16,
'processing_rate': 200,
'batch_size': 20
 we would like to increase these numbers, since we need more speed in processing, but once we try to increase it increases error rate...

Blobstore 文件数:几个(其中一些包含数百万行)

前端实例类: F4

处理流程:

  1. 我们仅使用映射器进行此特定处理。
  2. 我们使用 BlobstoreLineInputReader(blob 包含文本文件)。
  3. 每行代表我们需要创建的新条目,如果它不存在(其中一些我们更新)。

我的问题是:

  • 我们怎样才能避免这些错误呢?
  • 关于我们如何选择/平衡映射器参数(shard_count、processing_rate、batch_size)是否有任何提示/提示?
  • 工作会发生什么,它会被重试(如果是,我们如何控制它?)还是不重试?

顺便说一句,我们尝试使用此处提供的一些建议(控制 batch_size),但我们仍然看到这一点。

4

1 回答 1

0

这看起来像一个超时错误 - 检查您的日志以查看该进程在此之前运行了多长时间。

如果是,您应该尝试减少调用 put_multi() 的项目数量(即减少批量大小)并添加计时器检查,以便当每次 put_multi() 调用的平均时间接近处理时间时限制你退出,让另一个开始。

于 2013-10-11T10:52:12.453 回答