6

我刚刚观看了 Google I/O 2010 的 App Engine 会话的批处理数据处理,阅读了Google Research的 MapReduce 文章的部分内容,现在我正在考虑在 Google App Engine 上使用 MapReduce来在 Python 中实现推荐系统。

我更喜欢使用 appengine-mapreduce 而不是 Task Queue API,因为前者提供了对某种类型的所有实例的轻松迭代、自动批处理、自动任务链接等。问题是:我的推荐系统需要计算两个不同模型实例之间的相关性,即两种不同类型的实例。

示例:我有这两个模型:用户和项目。每个都有一个标签列表作为属性。以下是计算用户和项目之间相关性的函数。请注意,calculateCorrelation应该为用户和项目的每个组合调用:

def calculateCorrelation(user, item):
    return calculateCorrelationAverage(u.tags, i.tags)

def calculateCorrelationAverage(tags1, tags2):
    correlationSum = 0.0
    for (tag1, tag2) in allCombinations(tags1, tags2):
        correlationSum += correlation(tag1, tag2)
    return correlationSum / (len(tags1) + len(tags2))

def allCombinations(list1, list2):
    combinations = []
    for x in list1:
        for y in list2:
            combinations.append((x, y))
    return combinations             

但这calculateCorrelation不是 appengine-mapreduce 中的有效 Mapper,而且这个函数甚至可能与 MapReduce 计算概念不兼容。然而,我需要确定……拥有 appengine-mapreduce 的优势,比如自动批处理和任务链,对我来说真的很棒。

有什么解决办法吗?

我应该定义自己的 InputReader 吗?读取两种不同类型的所有实例的新 InputReader 是否与当前的 appengine-mapreduce 实现兼容?

或者我应该尝试以下方法?

  • 将这两种所有实体的所有键两两组合成一个新模型的实例(可能使用 MapReduce)
  • 使用映射器对这个新模型的实例进行迭代
  • 对于每个实例,使用其中的键来获取不同种类的两个实体并计算它们之间的相关性。
4

2 回答 2

3

按照 Nick Johnson 的建议,我编写了自己的 InputReader。该阅读器从两种不同的类型中获取实体。它产生具有这些实体的所有组合的元组。这里是:

class TwoKindsInputReader(InputReader):
    _APP_PARAM = "_app"
    _KIND1_PARAM = "kind1"
    _KIND2_PARAM = "kind2"
    MAPPER_PARAMS = "mapper_params"

    def __init__(self, reader1, reader2):
        self._reader1 = reader1
        self._reader2 = reader2

    def __iter__(self):
        for u in self._reader1:
            for e in self._reader2:
                yield (u, e)

    @classmethod
    def from_json(cls, input_shard_state):
        reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
        reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])

        return cls(reader1, reader2)

    def to_json(self):
        json_dict = {}
        json_dict[self._KIND1_PARAM] = self._reader1.to_json()
        json_dict[self._KIND2_PARAM] = self._reader2.to_json()
        return json_dict

    @classmethod
    def split_input(cls, mapper_spec):
        params = mapper_spec.params
        app = params.get(cls._APP_PARAM)
        kind1 = params.get(cls._KIND1_PARAM)
        kind2 = params.get(cls._KIND2_PARAM)
        shard_count = mapper_spec.shard_count
        shard_count_sqrt = int(math.sqrt(shard_count))

        splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
        splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
        inputs = []

        for u in splitted1:
            for e in splitted2:
                inputs.append(TwoKindsInputReader(u, e))

        #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
        return inputs

    @classmethod
    def validate(cls, mapper_spec):
        return True #TODO

当您需要处理两种实体的所有组合时,应使用此代码。您还可以将其概括为两种以上。

这是一个有效的 mapreduce.yaml TwoKindsInputReader

mapreduce:
- name: recommendationMapReduce
  mapper:
    input_reader: customInputReaders.TwoKindsInputReader
    handler: recommendation.calculateCorrelationHandler
    params:
    - name: kind1
      default: kinds.User
    - name: kind2
      default: kinds.Item
    - name: shard_count
      default: 16
于 2010-10-20T13:38:48.540 回答
2

如果没有您实际计算的更多详细信息,很难知道要推荐什么。一个简单的选择是在 map 调用中简单地获取相关实体 - 没有什么可以阻止您在那里进行数据存储操作。

但是,这将导致很多小调用。按照您的建议,编写自定义 InputReader 将允许您并行获取两组实体,这将显着提高性能。

如果您提供有关如何加入这些实体的更多详细信息,我们或许能够提供更具体的建议。

于 2010-09-22T12:27:38.413 回答