与我的CouchDB问题有关。
谁能用麻木的人可以理解的方式来解释 MapReduce?
一直深入到 Map 和 Reduce 的基础知识。
Map是一个函数,它将某种列表中的项目“转换”为另一种项目并将它们放回同一种列表中。
假设我有一个数字列表:[1,2,3] 并且我想将每个数字加倍,在这种情况下,“将每个数字加倍”的函数是函数 x = x * 2。如果没有映射,我可以写一个简单的循环,比如说
A = [1, 2, 3]
foreach (item in A) A[item] = A[item] * 2
我会有 A = [2, 4, 6] 但不是写循环,如果我有一个 map 函数,我可以写
A = [1, 2, 3].Map(x => x * 2)
x => x * 2 是针对 [1,2,3] 中的元素执行的函数。发生的情况是程序获取每个项目,通过使 x 等于每个项目来针对它执行 (x => x * 2),并生成结果列表。
1 : 1 => 1 * 2 : 2
2 : 2 => 2 * 2 : 4
3 : 3 => 3 * 2 : 6
因此,在使用 (x => x * 2) 执行 map 函数后,您将拥有 [2, 4, 6]。
Reduce是一个函数,它“收集”列表中的项目并对所有项目执行一些计算,从而将它们减少为单个值。
求和或求平均值都是 reduce 函数的实例。例如,如果您有一个数字列表,例如 [7, 8, 9] 并且您希望将它们相加,您将编写一个这样的循环
A = [7, 8, 9]
sum = 0
foreach (item in A) sum = sum + A[item]
但是,如果您可以访问 reduce 函数,则可以这样编写
A = [7, 8, 9]
sum = A.reduce( 0, (x, y) => x + y )
现在有点令人困惑,为什么传递了 2 个参数(0 和带有 x 和 y 的函数)。为了使reduce函数有用,它必须能够获取2个项目,计算一些东西并将这2个项目“减少”为一个单一的值,因此程序可以减少每一对,直到我们有一个单一的值。
执行如下:
result = 0
7 : result = result + 7 = 0 + 7 = 7
8 : result = result + 8 = 7 + 8 = 15
9 : result = result + 9 = 15 + 9 = 24
但是你不想一直从零开始,所以第一个参数可以让你指定一个种子值,特别是第一result =
行中的值。
假设您想对 2 个列表求和,它可能如下所示:
A = [7, 8, 9]
B = [1, 2, 3]
sum = 0
sum = A.reduce( sum, (x, y) => x + y )
sum = B.reduce( sum, (x, y) => x + y )
或者您更可能在现实世界中找到的版本:
A = [7, 8, 9]
B = [1, 2, 3]
sum_func = (x, y) => x + y
sum = A.reduce( B.reduce( 0, sum_func ), sum_func )
它在数据库软件中是一件好事,因为有了 Map\Reduce 支持,您可以使用数据库,而无需知道数据如何存储在数据库中以使用它,这就是数据库引擎的用途。
您只需要能够通过向引擎提供 Map 或 Reduce 函数来“告诉”引擎您想要什么,然后 DB 引擎就可以找到围绕数据的方式,应用您的函数,并得出您想要的结果在你不知道它如何循环所有记录的情况下想要所有。
有索引、键、连接和视图以及单个数据库可以保存的许多东西,因此通过保护您免受数据实际存储方式的影响,您的代码更易于编写和维护。
并行编程也是如此,如果您只指定要对数据执行的操作而不是实际实现循环代码,那么底层基础架构可以“并行化”并在同时并行循环中执行您的函数。
MapReduce 是一种并行处理大量数据的方法,无需开发人员编写除 mapper 和 reduce 函数之外的任何其他代码。
map函数将数据输入并生成结果,该结果保存在屏障中。该功能可以与大量相同的地图任务并行运行。然后可以将数据集缩减为标量值。
所以如果你把它想象成一个 SQL 语句
SELECT SUM(salary)
FROM employees
WHERE salary > 1000
GROUP by deptname
我们可以使用map来获取薪水 > 1000 的员工子集,该子集将 map 发送到障碍到组大小的桶中。
Reduce将对每个组求和。给你你的结果集。
刚刚从我的谷歌论文的大学学习笔记中摘录了这个
第 2 步是地图。第 3 步是减少。
例如,
MapReduce 在 Map 和 Reduce 之间拆分的原因是因为可以轻松地并行完成不同的部分。(特别是如果 Reduce 具有某些数学属性。)
有关 MapReduce 的复杂但良好的描述,请参阅:Google 的 MapReduce 编程模型 - 重温 (PDF)。
MAP 和 REDUCE 是人类杀死最后一只恐龙时的旧 Lisp 函数。
想象一下,您有一个城市列表,其中包含有关名称、居住在那里的人数和城市规模的信息:
(defparameter *cities*
'((a :people 100000 :size 200)
(b :people 200000 :size 300)
(c :people 150000 :size 210)))
现在您可能想要找到人口密度最高的城市。
首先,我们使用 MAP 创建一个城市名称和人口密度列表:
(map 'list
(lambda (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))
*cities*)
=> ((A 500) (B 2000/3) (C 5000/7))
使用 REDUCE,我们现在可以找到人口密度最大的城市。
(reduce (lambda (a b)
(if (> (second a) (second b))
a
b))
'((A 500) (B 2000/3) (C 5000/7)))
=> (C 5000/7)
结合这两部分,我们得到以下代码:
(reduce (lambda (a b)
(if (> (second a) (second b))
a
b))
(map 'list
(lambda (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))
*cities*))
先介绍一下函数:
(defun density (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))
(defun max-density (a b)
(if (> (second a) (second b))
a
b))
然后我们可以将我们的 MAP REDUCE 代码编写为:
(reduce 'max-density
(map 'list 'density *cities*))
=> (C 5000/7)
它调用MAP
and REDUCE
(评估是由内而外的),所以它被称为map reduce。
让我们以谷歌论文中的例子为例。MapReduce 的目标是能够有效地使用大量并行工作的处理单元以用于某种算法。示例如下:您要提取一组文档中的所有单词及其计数。
典型实现:
for each document
for each word in the document
get the counter associated to the word for the document
increment that counter
end for
end for
MapReduce 实现:
Map phase (input: document key, document)
for each word in the document
emit an event with the word as the key and the value "1"
end for
Reduce phase (input: key (a word), an iterator going through the emitted values)
for each value in the iterator
sum up the value in a counter
end for
围绕这一点,您将拥有一个主程序,它将文档集划分为“拆分”,这将在 Map 阶段并行处理。发出的值由工作人员写入特定于工作人员的缓冲区中。然后,一旦通知缓冲区已准备好处理,主程序就会委托其他工作人员执行 Reduce 阶段。
每个工作器输出(作为 Map 或 Reduce 工作器)实际上是存储在分布式文件系统(Google 的 GFS)或 CouchDB 的分布式数据库中的文件。
MapReduce 的一个非常简单、快速和“傻瓜式”的介绍可在以下网址找到:http ://www.marcolotz.com/?p=67
贴出部分内容:
首先,为什么最初创建 MapReduce?
基本上,谷歌需要一种解决方案,使大型计算工作易于并行化,允许数据分布在通过网络连接的多台机器上。除此之外,它还必须以透明的方式处理机器故障并管理负载平衡问题。
MapReduce 的真正优势是什么?
有人可能会说 MapReduce 魔术是基于 Map 和 Reduce 函数应用程序。我必须承认,我非常不同意。MapReduce 如此受欢迎的主要特点是其自动并行化和分布的能力,以及简单的界面。这些因素加上对大多数错误的透明故障处理使这个框架如此受欢迎。
在纸上更深入一点:
MapReduce 最初是在 Google 的一篇论文(Dean & Ghemawat,2004 - 此处链接)中提到的,它是一种使用并行方法和商品计算机集群在大数据中进行计算的解决方案。与用 Java 编写的 Hadoop 相比,Google 的框架是用 C++ 编写的。该文档描述了并行框架如何使用大型数据集上的函数式编程中的 Map 和 Reduce 函数。
在这个解决方案中,将有两个主要步骤——称为 Map 和 Reduce——,在第一个和第二个之间有一个可选步骤——称为 Combine。Map 步骤将首先运行,在输入键值对中进行计算并生成新的输出键值。必须记住,输入键值对的格式不一定与输出格式对匹配。Reduce 步骤将组装同一键的所有值,并对其执行其他计算。结果,这最后一步将输出键值对。MapReduce 最简单的应用之一是实现字数统计。
此应用程序的伪代码如下所示:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1”);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
可以注意到,映射读取记录中的所有单词(在这种情况下,记录可以是一行)并将单词作为键发出,数字 1 作为值发出。稍后,reduce 将对同一键的所有值进行分组。举个例子:假设“house”这个词在记录中出现了 3 次。reducer 的输入是 [house,[1,1,1]]。在 reducer 中,它将键 house 的所有值相加,并给出以下键值作为输出:[house,[3]]。
这是在 MapReduce 框架中的样子:
作为 MapReduce 应用程序的其他一些经典示例,可以说:
• URL 访问频率计数
•反向网络链接图
•分布式Grep
•每个主机的术语向量
为了避免过多的网络流量,论文描述了框架应该如何尝试维护数据局部性。这意味着它应该始终尝试确保运行 Map 作业的机器在其内存/本地存储中拥有数据,避免从网络中获取数据。为了减少映射器的网络吞吐量,使用了前面描述的可选组合器步骤。组合器在给定机器上对映射器的输出进行计算,然后将其发送到减速器——可能在另一台机器上。
该文档还描述了框架元素在出现故障时应如何表现。这些元素在论文中被称为工人和主人。它们将在开源实现中分为更具体的元素。由于谷歌只在论文中描述了该方法,并没有发布其专有软件,因此创建了许多开源框架来实现该模型。例如,可以说 Hadoop 或 MongoDB 中有限的 MapReduce 功能。
运行时应该处理非专业程序员的细节,比如划分输入数据、在大量机器上调度程序执行、处理机器故障(当然是以透明的方式)和管理机器间通信. 有经验的用户可以调整这些参数,因为输入数据将如何在工作人员之间进行分区。
关键概念:
•<strong>容错:它必须优雅地容忍机器故障。为了执行此操作,master 会定期对 worker 执行 ping 操作。如果 master 在一定的时间间隔内没有收到给定 worker 的响应,则 master 将在该 worker 中将工作定义为失败。在这种情况下,故障工作人员完成的所有地图任务都将被丢弃并交给另一个可用的工作人员。如果 worker 仍在处理 map 或 reduce 任务,也会发生类似的情况。请注意,如果 worker 已经完成了它的 reduce 部分,那么所有计算在它失败时已经完成,不需要重置。作为主要故障点,如果主节点失败,则所有作业都会失败。出于这个原因,可以为 master 定义定期检查点,以保存其数据结构。
•<strong>局部性:为了避免网络流量,该框架试图确保所有输入数据在本地可用于将对其执行计算的机器。在原始描述中,它使用 Google 文件系统 (GFS),复制因子设置为 3,块大小为 64 MB。这意味着相同的 64 MB 块(构成文件系统中的文件)将在三台不同的机器上具有相同的副本。主人知道块在哪里,并尝试在该机器上安排地图作业。如果失败,主服务器会尝试在任务输入数据的副本附近分配一台机器(即数据机器的同一机架中的工作机器)。
•<strong>任务粒度:假设每个map 阶段被分成M 块,每个Reduce 阶段被分成R 块,理想的情况是M 和R 远大于工作机器的数量。这是因为执行许多不同任务的工作人员提高了动态负载平衡。除此之外,它还提高了工人失败时的恢复速度(因为它已经完成的许多地图任务可以分散到所有其他机器上)。
•<strong>备份任务:有时,Map 或Reducer 工作人员的行为可能比集群中的其他工作人员慢得多。这可以保持总处理时间并使其等于单个慢速机器的处理时间。原始论文描述了一种称为备份任务的替代方案,当 MapReduce 操作接近完成时由主设备调度。这些是进行中任务的主控安排的任务。因此,当主要或备份完成时,MapReduce 操作完成。
•<strong>计数器:有时可能希望计算事件发生的次数。出于这个原因,计数在哪里创建。每个worker中的计数器值会定期传播到master。然后 master 聚合(是的。看起来 Pregel 聚合器来自这个地方)成功的 map 和 reduce 任务的计数器值,并在 MapReduce 操作完成时将它们返回给用户代码。在主状态中还有一个当前计数器值可用,因此观察进程的人可以跟踪它的行为方式。
好吧,我想有了上面所有的概念,Hadoop 对你来说就是小菜一碟。如果您对原始 MapReduce 文章或任何相关内容有任何疑问,请告诉我。
如果您熟悉 Python,以下是 MapReduce 的最简单解释:
In [2]: data = [1, 2, 3, 4, 5, 6]
In [3]: mapped_result = map(lambda x: x*2, data)
In [4]: mapped_result
Out[4]: [2, 4, 6, 8, 10, 12]
In [10]: final_result = reduce(lambda x, y: x+y, mapped_result)
In [11]: final_result
Out[11]: 42
查看如何单独处理每个原始数据段,在这种情况下,乘以 2(MapReduce 的映射部分)。基于mapped_result
,我们得出的结论是42
(MapReduce 的reduce部分)。
这个例子的一个重要结论是每个处理块不依赖于另一个块。例如,如果thread_1
maps[1, 2, 3]
和thread_2
maps [4, 5, 6]
,两个线程的最终结果仍然是[2, 4, 6, 8, 10, 12]
,但我们已经将处理时间减半。reduce 操作也是如此,这也是 MapReduce 在并行计算中的工作原理。
我不想听起来老套,但这对我帮助很大,而且很简单:
cat input | map | reduce > output