我在玩 Spark。它是来自网站的默认预构建发行版 (0.7.0),具有默认配置、集群模式、一名工作人员(我的本地主机)。我阅读了有关安装的文档,一切似乎都很好。
我有一个 CSV 文件(各种大小,1000 - 100 万行)。如果我用小输入文件(例如 1000 行)运行我的应用程序,一切都很好,程序在几秒钟内完成并产生预期的输出。但是当我提供更大的文件(100.000 行或 100 万行)时,执行失败。我试图挖掘日志,但没有太大帮助(它重复整个过程大约 9-10 次,然后退出失败。此外,还有一些与从某个空源获取失败有关的错误)。
第一个 JavaRDD 返回的结果 Iterable 对我来说是可疑的。如果我返回一个硬编码的单例列表(如 res.add("something"); return res;),那么一切都很好,即使有一百万行。但是,如果我添加我想要的所有键(28 个长度为 6-20 个字符的字符串),则该过程仅在大输入时才会失败。问题是,我需要所有这些键,这是实际的业务逻辑。
我正在使用 Linux amd64,四核,8GB 内存。最新的 Oracle Java7 JDK。火花配置:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
我必须提到,当我启动程序时,它说:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
这是我的程序。它基于 JavaWordCount 示例,进行了最少的修改。
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String s)
{
// parsing "s" as the line, computation, building res (it's a List<String>)
return res;
}
});
final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer i1, final Integer i2)
{
return i1 + i2;
}
});
counts.collect();
for (Tuple2<?, ?> tuple : counts.collect()) {
System.out.println(tuple._1 + ": " + tuple._2);
}
}
}