2

只需将连接字段作为 reducer 键发送即可通过单个键轻松连接数据集。但是通过几个键连接记录,其中至少一个应该是相同的,对我来说并不容易。

示例 我有日志,我想按用户参数对它们进行分组,我想通过 (ipAddress, sessionId,visitorCockies) 加入它们

因此,如果 log1.ip == log2.ip OR log1.session = log2.session OR log1.cockie = log2.coockie,则 log1 应与 log2 分组。也许可以创建复合键或诸如 minHash 之类的概率方法...

可能吗?

4

5 回答 5

0

The issue is that MapReduce joins are typically implemented by giving records that match on some field the same reduce key so that they get sent to the same reducer. So anything to get around this is going to be a bit of a hack, but it is possible...

Here's what I would recommend: for each input record, generate three copies, each with a new "key" field that is prefixed by the field it's coming from. So for example, say you had the following input:

(ip=1.2.3.4, session=ABC, cookie=123)
(ip=3.4.5.6, session=DEF, cookie=456)

Then you would generate

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4)
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC)
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123)
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6)
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF)
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456)

And then you could simply group on this new field.

I'm not too familiar with scalding/cascading (although I've been meaning to learn more about it) but this would definitely conform to how joins are generally done in Hadoop.

于 2012-09-24T22:53:01.797 回答
0

提示:使用类型别名使您的 Scalding 代码易于阅读

注意 0:这个解决方案特别好,因为它总是只有 1 个映射作业,即使有更多的键可以加入。

注意 1:假设每个管道没有重复的键,否则您必须使 'key 也具有它来自哪个日志的索引,并且 mapTo 将是一个 flatMapTo 并且更复杂一些。

注意 2:为简单起见,这将丢弃连接字段,为了保留它们,您需要一个丑陋的大元组(ip1、ip2、session1、session2、...等)。如果您真的想要,我可以写一个保留它们的示例。

注意 3:如果你真的想合并重复的值,你可以用 groupBy 每个 logEntry1 和 logEntry2,产生一个 logEntryList,然后是 cat(如评论中所述,这对于连接是不正常的)。这将创建另外 2 个映射作业。

type String2 = (String, String)
type String3 = (String, String, String)

def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
  _.productIterator.toList.zipWithIndex.map {
    case (key: String, index: Int) => index.toString + key
  }
)

(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group))
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))
于 2013-11-30T11:47:00.423 回答
0

按照上面 Joe 的描述创建单独的连接后,您需要删除重复项。如果您在“OR-join”中使用的所有字段中它们都相等,则数据中的两个元组是重复的。因此,如果您之后对代表所有相关字段的键进行自然连接,您会将所有重复项组合在一起。因此,您可以通过单个出现的相应元组来替换它们。

让我们看一个示例:假设您有包含字段 (A,B,C,D) 的元组,并且您感兴趣的字段是 A、B 和 C。您将首先对 A、B 进行等值连接, 和 C 分开。对于每一个,您都将加入初始元组流。用 (A0, B0, C0, D0) 表示第一个流,用 (A1, B1, C1, D1) 表示第二个流。结果将是元组(A0、B0、C0、D0、A1、B1、C1、D1)。对于这些元组中的每一个,您将创建一个元组 (A0A1B0B1C0C1, A0, B0, C0, D0, A1, B1, C1, D1),因此所有重复项将在随后的 reducer 中组合在一起。对于每个组,只返回一个包含的元组。

于 2012-09-26T04:21:54.617 回答
0

对于级联,我最终创建了一个过滤器,它检查 OR 中任何条件的输出是否为真。级联过滤器输出可以选择使用的真/假值。

于 2013-04-16T19:41:05.443 回答
0

你能描述更多关于“通过几个键加入记录”的信息吗?

如果您知道工作流程中可以连接特定键的点,那么最好的方法可能是定义一个具有多个连接的流,而不是尝试操作一个复杂的数据结构,以便一步解决 N 个键。

这是一个示例应用程序,它展示了如何在 Cascading 中处理不同类型的连接:https ://github.com/Cascading/CoPA

于 2012-11-18T17:35:50.377 回答