我想将不同的文件附加到不同的减速器。是否可以在hadoop中使用分布式缓存技术?
我能够将相同的文件(文件)附加到所有减速器。但是由于内存限制,我想知道我是否可以将不同的文件附加到不同的减速器。
如果这是一个无知的问题,请原谅我。
请帮忙!
提前致谢!
我想将不同的文件附加到不同的减速器。是否可以在hadoop中使用分布式缓存技术?
我能够将相同的文件(文件)附加到所有减速器。但是由于内存限制,我想知道我是否可以将不同的文件附加到不同的减速器。
如果这是一个无知的问题,请原谅我。
请帮忙!
提前致谢!
此外,可能值得尝试使用内存计算/数据网格技术,如 GridGain、Infinispan 等......这样您就可以将数据加载到内存中,并且您对如何映射计算作业没有任何限制(使用数据亲和性映射/减少)到任何数据。
这是一个奇怪的愿望,因为任何 reducer 都没有绑定到特定节点,并且在执行期间,reducer 可以在任何节点甚至节点上运行(如果存在故障或推测执行)。因此,所有 reducer 都应该是同质的,唯一不同的是它们处理的数据。
所以我想当你说你想把不同的文件放在不同的减速器上时,你实际上想把不同的文件放在减速器上,这些文件应该对应于那些减速器将要处理的数据(键)。
我知道的唯一一种方法是将数据放在 HDFS 上,并在 reducer 开始处理数据时从它读取。
package com.a;
import javax.security.auth.login.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class PrefixNew4Reduce4 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
// @SuppressWarnings("unchecked")
ArrayList<String> al = new ArrayList<String>();
public void configure(JobConf conf4)
{
String from = "home/users/mlakshm/haship";
OutputStream dst = null;
try {
dst = new BufferedOutputStream(new FileOutputStream(to, false));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} /* src (hdfs file) something like hdfs://127.0.0.1:8020/home/rystsov/hi */
FileSystem fs = null;
try {
fs = FileSystem.get(new URI(from), conf4);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
FSDataInputStream src;
try {
src = fs.open(new Path(from));
String val = src.readLine();
StringTokenizer st = new StringTokenizer(val);
al.add(val);
System.out.println("val:----------------->"+val);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
StringTokenizer stk = new StringTokenizer(key.toString());
String t = stk.nextToken();
String i = stk.nextToken();
String j = stk.nextToken();
ArrayList<String> al1 = new ArrayList<String>();
for(int i = 0; i<al.size(); i++)
{
boolean a = (al.get(i).equals(i)) || (al.get(i).equals(j));
if(a==true)
{
output.collect(key, new Text(al.get(i));
}
while(values.hasNext())
{
String val = values.next().toString();
al1.add(val);
}
for(int i = 0; i<al1.size(); i++)
{
output.collect(key, new Text(al1.get(i));
}