如何设置一个reducer来发送和一个mapper来接收?

我正在使用mapreducehadoop上开发一些代码,它使用两个映射器和两个reducer。 我被告知使用SequenceFileInputFormatSequenceFileOutputFormat来使第一个reducer的输出和第二个mapper的输入一起工作。 问题是我正在记录一个错误,经过googleing很多我不知道为什么。

错误:

java.lang.Exception:java.io.IOException:键入map中的键不匹配: expected org.apache.hadoop.io。 IntWritable收到 org.apache.hadoop.io。 文本

键入map中的键不匹配: 期望 org.apache.hadoop.io。 IntWritable收到 org.apache.hadoop.io。 文本

代码:

package casoTaxis; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class Eje1{ public static class MapperJob1 extends Mapper { //El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Text hackLicense; IntWritable totalAmount; //salidas StringTokenizer itr = new StringTokenizer(value.toString(), ","); itr.nextToken(); hackLicense = new Text(itr.nextToken()); for(int i=2; i<itr.countTokens(); i++) itr.nextToken(); totalAmount = new IntWritable( Integer.parseInt(itr.nextToken()) ); context.write(hackLicense, totalAmount); } } public static class ReducerJob1 extends Reducer { //No encontre una clase InpuFormat que sea Text, IntWritable public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static class MapperJob2 extends Mapper { //El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class ReducerJob2 extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int max = 0; for (IntWritable val : values) { int maxVal = val.get(); if( maxVal>max ) max = maxVal; } String licencia = "Conductor con licencia = " + key; String recaudacion = "Recaudacion = " + max; context.write(new Text(licencia), new Text(recaudacion)); } } public static void main(String[] args) throws Exception { Configuration conf1 = new Configuration(); Configuration conf2 = new Configuration(); //conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); Job job1 = Job.getInstance(conf1, "Eje1-Job1"); Job job2 = Job.getInstance(conf2, "Eje1-Job2"); job1.setJarByClass(Eje1.class); job2.setJarByClass(Eje1.class); job1.setMapperClass(MapperJob1.class); job2.setMapperClass(MapperJob2.class); job1.setReducerClass(ReducerJob1.class); job2.setReducerClass(ReducerJob2.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputKeyClass(IntWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job2.setInputFormatClass(SequenceFileInputFormat.class);///asdasdads FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, pathIntermedio); FileInputFormat.addInputPath(job2, pathIntermedio); FileOutputFormat.setOutputPath(job2, new Path(args[1])); job1.waitForCompletion(true); System.exit(job2.waitForCompletion(true) ? 0 : 1); } private static final Path pathIntermedio = new Path("intermediate_output"); } 

为什么我收到此错误? 有没有更好的方法来实现这一目标?

错误在于线条

 job2.setMapOutputKeyClass(Text.class); job2.setMapOutputKeyClass(IntWritable.class); 

其中第二个应该是:

 job2.setMapOutputValueClass(IntWritable.class);