Mapreduce组合器

我有一个简单的mapreduce代码,包括mapper,reducer和combiner。 mapper的输出传递给组合器。 但是对于reducer而言,不是来自组合器的输出,而是传递mapper的输出。

请帮助

码:

package Combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class AverageSalary { public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] empDetails= value.toString().split(","); Text unit_key = new Text(empDetails[1]); DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2])); context.write(unit_key,salary_value); } } public static class Combiner extends Reducer { public void reduce(final Text key, final Iterable values, final Context context) { String val; double sum=0; int len=0; while (values.iterator().hasNext()) { sum+=values.iterator().next().get(); len++; } val=String.valueOf(sum)+":"+String.valueOf(len); try { context.write(key,new Text(val)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class Reduce extends Reducer { public void reduce (final Text key, final Text values, final Context context) { //String[] sumDetails=values.toString().split(":"); //double average; //average=Double.parseDouble(sumDetails[0]); try { context.write(key,values); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String args[]) { Configuration conf = new Configuration(); try { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Main  "); System.exit(-1); } Job job = new Job(conf, "Average salary"); //job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.setJarByClass(AverageSalary.class); job.setMapperClass(Map.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : -1); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

}

你好像忘记了合成器的重要属性:

键/值的输入类型和键/值的输出类型必须相同。

您不能接受Text/DoubleWritable并返回Text/Text 。 我建议你使用Text而不是DoubleWritable ,并在Combiner进行适当的解析。

Combiners的第一条规则是: 不要假设组合器会运行将组合器视为优化

无法保证Combiner可以运行您的所有数据。 在某些情况下,当数据不需要溢出到磁盘时,MapReduce将完全跳过使用Combiner。 另请注意,Combiner可能会在数据子集上运行多次! 每次泄漏都会运行一次。

在你的情况下,你正在做出这个错误的假设。 你应该在Combiner和Reducer中做总和。

此外,您还应该关注@ user987339的答案。 组合器的输入和输出必须相同(Text,Double – > Text,Double),它需要与Mapper的输出和Reducer的输入相匹配。

如果使用了combine函数,那么它与reduce函数的forms相同(并且是Reducer的一个实现),除了它的输出类型是中间键和值类型(K2和V2),因此它们可以提供reduce函数:map:(K1,V1)→list(K2,V2)组合:(K2,list(V2))→list(K2,V2)reduce:(K2,list(V2))→list(K3,V3)经常组合和减少function相同,在这种情况下,K3与K2相同,V3与V2相同。

运行mapreduce时, Combiner不会始终有效。

如果至少有三个溢出文件(映射器的输出写入本地磁盘),则组合器将执行,以便可以减小文件的大小,以便可以轻松地将其传输到reduce节点。

可以通过min.num.spills.for.combine属性设置组合器需要运行的溢出数量