Hadoop映射器和reducer输出不匹配

我试图通过使用setMapOutputKeyClasssetMapOutputValueClasssetMapKeyClasssetMapValueClass配置不同的mapper和reducer输出类型。 但是,即使在我调用这些函数之后,我仍然会在运行时收到错误消息。

这是我的代码:

 package org.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class Sort { public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new LongWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(Sort.class); conf.setJobName("sort"); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); JobClient.runJob(conf); } } 

我收到的错误消息:

 java.lang.Exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354) Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:168) at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1160) at org.myorg.Sort$Reduce.reduce(Sort.java:34) at org.myorg.Sort$Reduce.reduce(Sort.java:28) at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1436) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1441) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1303) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:431) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/10/12 14:08:11 INFO mapred.JobClient: map 0% reduce 0% 13/10/12 14:08:11 INFO mapred.JobClient: Job complete: job_local599611407_0001 13/10/12 14:08:11 INFO mapred.JobClient: Counters: 0 13/10/12 14:08:11 INFO mapred.JobClient: Job Failed: NA Exception in thread "main" java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1357) at org.myorg.Sort.main(Sort.java:57) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.main(RunJar.java:160) 

我做错了什么吗? 谢谢你的帮助!

注释掉下面的行,该程序应该有效。 这是对问题的解释。

 conf.setCombinerClass(Reduce.class); 

编写输入和输出类型相同的reducer的另一种解决方案。 在减少器类也可以用作组合器类的情况下。