2

为 input_reader 使用新的 Google App Engine MapReduce 库过滤器我想知道如何按 ndb.Key 过滤。

我阅读了这篇文章,并且在过滤器元组中使用过 datetime、string、int、float,但是如何通过 ndb.Key 过滤?

当我尝试按 ndb.Key 过滤时,出现此错误:

BadReaderParamsError: Expected Key, got u"Key('Clients', 406)"

或者这个错误:

TypeError: Key('Clients', 406) is not JSON serializable

我试图传递 ndb.Key 对象和 ndb.Key 的字符串表示形式。

这是我的两个过滤器元组:

样品 1:

input_reader': {
  'input_reader': 'mapreduce.input_readers.DatastoreInputReader',
  'entity_kind': 'model.Sales',
  'filters': [("client","=", ndb.Key('Clients', 406))]
}

样本 2:

input_reader': {
  'input_reader': 'mapreduce.input_readers.DatastoreInputReader',
  'entity_kind': 'model.Sales',
  'filters': [("client","=", "%s" % ndb.Key('Clients', 406))]
}
4

4 回答 4

2

这有点棘手。

如果您查看Google Code 上的代码,您会看到它mapreduce.model定义了一个JSON_DEFAULTSdict,它确定在 JSON 序列化/反序列化中获得特殊情况处理的类:默认情况下,只是 datetime。因此,您可以将类猴子修补ndb.Key到那里,并为其提供执行序列化/反序列化的功能 - 例如:

from mapreduce import model

def _JsonEncodeKey(o):
  """Json encode an ndb.Key object."""
  return {'key_string': o.urlsafe()}

def _JsonDecodeKey(d):
  """Json decode a ndb.Key object."""
  return ndb.Key(urlsafe=d['key_string'])

model.JSON_DEFAULTS[ndb.Key] = (_JsonEncodeKey, _JsonDecodeKey)
model._TYPE_IDS['Key'] = ndb.Key

您可能还需要重复最后两行来修补mapreduce.lib.pipeline.util

另请注意,如果您这样做,您需要确保它在运行 mapreduce 任何部分的任何实例上运行:最简单的方法是编写一个导入上述注册代码的包装脚本,以及mapreduce.main.APP,并覆盖mapreduce您的 app.yaml 中的 URL 以指向您的包装器。

于 2013-05-07T16:13:19.577 回答
1

制作自己的输入阅读器DatastoreInputReader,它知道如何解码基于键的过滤器:

class DatastoreKeyInputReader(input_readers.DatastoreKeyInputReader):
    """Augment the base input reader to accommodate ReferenceProperty filters"""
    def __init__(self, *args, **kwargs):
        try:
            filters = kwargs['filters']
            decoded = []
            for f in filters:
                value = f[2]
                if isinstance(value, list):
                    value = db.Key.from_path(*value)
                decoded.append((f[0], f[1], value))
            kwargs['filters'] = decoded
        except KeyError:
            pass

        super(DatastoreKeyInputReader, self).__init__(*args, **kwargs)

在将过滤器作为选项传递之前,在过滤器上运行此函数:

def encode_filters(filters):
    if filters is not None:
        encoded = []
        for f in filters:
            value = f[2]
            if isinstance(value, db.Model):
                value = value.key()
            if isinstance(value, db.Key):
                value = value.to_path()
            entry = (f[0], f[1], value)
            encoded.append(entry)
        filters = encoded

    return filters
于 2013-05-07T16:11:46.900 回答
1

我遇到了同样的问题,并想出了一个使用计算属性的解决方法。

您可以使用 Key id 向您的销售模型添加新的 ndb.ComputedProperty。ID 只是字符串,所以不会有任何 JSON 问题。

client_id = ndb.ComputedProperty(lambda self: self.client.id())

然后将该条件添加到您的 mapreduce 查询过滤器

input_reader': {
  'input_reader': 'mapreduce.input_readers.DatastoreInputReader',
  'entity_kind': 'model.Sales',
  'filters': [("client_id","=", '406']
}

唯一的缺点是 Computed 属性在调用 put() 参数之前不会被索引和存储,因此您必须遍历所有 Sales 实体并保存它们:

for sale in Sales.query().fetch():
  sale.put()
于 2013-10-01T09:13:42.320 回答
1

你知道 to_old_key() 和 from_old_key() 方法吗?

于 2013-05-08T14:52:08.597 回答