我听到了很多关于 map/reduce 的信息,尤其是在 Google 大规模并行计算系统的背景下。它到底是什么?
5 回答
来自谷歌MapReduce研究发布页面的摘要:
MapReduce 是一种用于处理和生成大型数据集的编程模型和相关实现。用户指定一个处理键/值对以生成一组中间键/值对的 map 函数,以及一个合并与同一中间键关联的所有中间值的 reduce 函数。
MapReduce 的优点是可以在多个处理节点(多个服务器)上并行执行处理,因此它是一个可以很好扩展的系统。
由于它基于函数式编程模型,map
并且reduce
每个步骤都没有任何副作用(map
流程的每个子部分的状态和结果不依赖于另一个),因此被映射和缩减的数据集可以各自分离在多个处理节点上。
Joel你的编程语言能做到这一点吗?文章讨论了谷歌如何理解函数式编程以提出为其搜索引擎提供支持的 MapReduce。如果您不熟悉函数式编程以及它如何允许可扩展代码,那么这是一本非常好的读物。
另请参阅:维基百科:MapReduce
相关问题:请简单解释一下mapreduce
Map 是一个函数,它将另一个函数应用于列表中的所有项目,以生成另一个列表,其中包含所有返回值。(“将 f 应用于 x”的另一种说法是“调用 f,将其传递给 x”。所以有时说“应用”而不是“调用”听起来更好。)
这就是 map 可能是用 C# 编写的(它被称为Select
并且在标准库中):
public static IEnumerable<R> Select<T, R>(this IEnumerable<T> list, Func<T, R> func)
{
foreach (T item in list)
yield return func(item);
}
由于您是 Java 哥们儿,而 Joel Spolsky 喜欢向 GROSSLY UNFAIR LIES 讲述 Java 是多么糟糕(实际上,他并没有撒谎,它很糟糕,但我试图赢得您的支持),这是我非常粗略的尝试一个 Java 版本(我没有 Java 编译器,我隐约记得 Java 版本 1.1!):
// represents a function that takes one arg and returns a result
public interface IFunctor
{
object invoke(object arg);
}
public static object[] map(object[] list, IFunctor func)
{
object[] returnValues = new object[list.length];
for (int n = 0; n < list.length; n++)
returnValues[n] = func.invoke(list[n]);
return returnValues;
}
我相信这可以通过一百万种方式得到改善。但这是基本的想法。
Reduce 是一个将列表中的所有项目转换为单个值的函数。为此,需要为其提供另一个func
将两个项目转换为单个值的函数。将前两项交给func
. 然后是第三项的结果。然后是第四个项目的结果,依此类推,直到所有项目都消失了,我们只剩下一个值。
在 C# 中,reduce 被调用Aggregate
并再次出现在标准库中。我将直接跳到 Java 版本:
// represents a function that takes two args and returns a result
public interface IBinaryFunctor
{
object invoke(object arg1, object arg2);
}
public static object reduce(object[] list, IBinaryFunctor func)
{
if (list.length == 0)
return null; // or throw something?
if (list.length == 1)
return list[0]; // just return the only item
object returnValue = func.invoke(list[0], list[1]);
for (int n = 1; n < list.length; n++)
returnValue = func.invoke(returnValue, list[n]);
return returnValue;
}
这些 Java 版本需要添加泛型,但我不知道如何在 Java 中做到这一点。但是您应该能够将匿名内部类传递给它们以提供函子:
string[] names = getLotsOfNames();
string commaSeparatedNames = (string)reduce(names,
new IBinaryFunctor {
public object invoke(object arg1, object arg2)
{ return ((string)arg1) + ", " + ((string)arg2); }
}
希望泛型能够摆脱演员表。C# 中的类型安全等效项是:
string commaSeparatedNames = names.Aggregate((a, b) => a + ", " + b);
为什么这很“酷”?将较大的计算分解为较小的部分的简单方法,以便它们可以以不同的方式重新组合在一起,总是很酷。Google 应用这个想法的方式是并行化,因为 map 和 reduce 都可以在多台计算机上共享。
但关键要求不是您的语言可以将函数视为值。任何 OO 语言都可以做到这一点。并行化的实际要求是func
您传递给 map 和 reduce 的小函数不得使用或更新任何状态。它们必须返回一个仅依赖于传递给它们的参数的值。否则,当您尝试并行运行整个事情时,结果将完全搞砸。
在对非常长的华夫饼或非常短的模糊博客文章感到最沮丧之后,我最终发现了这篇非常好的严谨简洁的论文。
然后我继续通过翻译成 Scala 使其更简洁,在这里我提供了最简单的情况,用户只需指定应用程序的map
和reduce
部分。在 Hadoop/Spark 中,严格来说,采用了更复杂的编程模型,需要用户明确指定此处概述的另外 4 个函数:http ://en.wikipedia.org/wiki/MapReduce#Dataflow
import scalaz.syntax.id._
trait MapReduceModel {
type MultiSet[T] = Iterable[T]
// `map` must be a pure function
def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
(data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] =
data.flatMap(map)
def shufflePhase[K2, V2](mappedData: MultiSet[(K2, V2)]): Map[K2, MultiSet[V2]] =
mappedData.groupBy(_._1).mapValues(_.map(_._2))
// `reduce` must be a monoid
def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
(shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
shuffledData.flatMap(reduce).map(_._2)
def mapReduce[K1, K2, V1, V2, V3](data: MultiSet[(K1, V1)])
(map: ((K1, V1)) => MultiSet[(K2, V2)])
(reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)]): MultiSet[V3] =
mapPhase(map)(data) |> shufflePhase |> reducePhase(reduce)
}
// Kinda how MapReduce works in Hadoop and Spark except `.par` would ensure 1 element gets a process/thread on a cluster
// Furthermore, the splitting here won't enforce any kind of balance and is quite unnecessary anyway as one would expect
// it to already be splitted on HDFS - i.e. the filename would constitute K1
// The shuffle phase will also be parallelized, and use the same partition as the map phase.
abstract class ParMapReduce(mapParNum: Int, reduceParNum: Int) extends MapReduceModel {
def split[T](splitNum: Int)(data: MultiSet[T]): Set[MultiSet[T]]
override def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
(data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = {
val groupedByKey = data.groupBy(_._1).map(_._2)
groupedByKey.flatMap(split(mapParNum / groupedByKey.size + 1))
.par.flatMap(_.map(map)).flatten.toList
}
override def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
(shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
shuffledData.map(g => split(reduceParNum / shuffledData.size + 1)(g._2).map((g._1, _)))
.par.flatMap(_.map(reduce))
.flatten.map(_._2).toList
}
Map 是可以应用于数组的原生 JS 方法。由于某些函数映射到原始数组中的每个元素,它会创建一个新数组。因此,如果您映射了一个函数(元素){ return element * 2;},它将返回一个新数组,其中每个元素都加倍。原始数组不会被修改。
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/map
Reduce 是一种原生 JS 方法,也可以应用于数组。它将函数应用于数组并具有称为累加器的初始输出值。它遍历数组中的每个元素,应用一个函数,并将它们减少为一个值(从累加器开始)。它很有用,因为你可以有任何你想要的输出,你只需要从那种类型的累加器开始。因此,如果我想将某些东西简化为对象,我将从累加器 {} 开始。
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce?v=a
在大数据的情况下:
1.MapReduce is a Hadoop framework used for writing applications
2.MapReduce provides analytical capabilities for analyzing huge volumes of
complex data
3.facilitates concurrent processing by splitting petabytes of data into smaller
chunks, and processing them in parallel on Hadoop commodity servers
4.used for querying and selecting data in the Hadoop Distributed File System
5.MapReduce programming enables companies to access new sources of data