Mapreduce作业运行,并且有一个例外

这是我的代码:

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SecondarySort extends Configured implements Tool{ public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new SecondarySort(), args); } catch (Exception e) { e.printStackTrace(); } } static class KeyPartitioner implements Partitioner { @Override public int getPartition(StockKey arg0, DoubleWritable arg1, int arg2) { int partition = arg0.name.hashCode() % arg2; return partition; } @Override public void configure(JobConf job) { } } static class StockKey implements WritableComparable { String name; Long timestamp; public StockKey() { } StockKey(String name, Long timestamp){ this.name = name; this.timestamp = timestamp; } @Override public void readFields(DataInput arg0) throws IOException { name = WritableUtils.readString(arg0); timestamp = arg0.readLong(); } @Override public void write(DataOutput arg0) throws IOException { WritableUtils.writeString(arg0, name); arg0.writeLong(timestamp); } @Override public int compareTo(StockKey arg0) { int result = 0; result = name.compareToIgnoreCase(arg0.name); if(result == 0) result = timestamp.compareTo(arg0.timestamp); return result; } public String toString() { String outputString = name+","+timestamp; return outputString; } } static class StockReducer implements Reducer{ public void reduce(StockKey key, Iterator value, Outp OutputCollector context, Reporter reporter) throws IOException { Text k = new Text(key.toString()); while(value.hasNext()) { Double v = value.next().get(); Text t = new Text(v.toString()); context.collect(k, t); } } @Override public void configure(JobConf job) { // TODO Auto-generated method stub } @Override public void close() throws IOException { // TODO Auto-generated method stub } } static class StockMapper implements Mapper { public void map(LongWritable offset, Text value, OutputCollector context, Reporter reporter) throws IOException { String[] values = value.toString().split(","); StockKey key = new StockKey(values[0].trim(), Long.parseLong(values[1].trim())); DoubleWritable val = new DoubleWritable(Double.parseDouble(values[2].trim())); context.collect(key, val); } @Override public void configure(JobConf job) { // TODO Auto-generated method stub } @Override public void close() throws IOException { // TODO Auto-generated method stub } } @SuppressWarnings("unchecked") @Override public int run(String[] arg) throws Exception { JobConf conf = new JobConf(getConf(), SecondarySort.class); conf.setJobName(SecondarySort.class.getName()); conf.setJarByClass(SecondarySort.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setMapOutputKeyClass(StockKey.class); conf.setMapOutputValueClass(Text.class); conf.setPartitionerClass((Class<? extends Partitioner>) KeyPartitioner.class); conf.setMapperClass((Class<? extends Mapper>) StockMapper.class); conf.setReducerClass((Class<? extends Reducer>) StockReducer.class); FileInputFormat.addInputPath(conf, new Path(arg[0])); FileOutputFormat.setOutputPath(conf, new Path(arg[1])); JobClient.runJob(conf); return 0; } } 

这是例外:

 java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499) at SecondarySort$StockMapper.map(SecondarySort.java:135) at SecondarySort$StockMapper.map(SecondarySort.java:1) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264) 12/07/13 03:22:32 INFO mapred.JobClient: Task Id : attempt_201207130314_0002_m_000001_2, Status : FAILED java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499) at SecondarySort$StockMapper.map(SecondarySort.java:135) at SecondarySort$StockMapper.map(SecondarySort.java:1) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264) 

这段代码有很多潜在的问题可能导致它:

  • StockKey – 你应该覆盖默认的hashCode()方法 – 目前两个具有相同内容的StockKey将具有不同的hashCode值(就好像你没有覆盖JVM默认值,那么它将返回一个数字,即所有范围和目的都是两个对象的内存地址)。 我知道在你的分区器中你只使用name字段(这是一个字符串,并且将有一个有效的hashCode()实现,但这是一个很好的做法,以防你将来使用整个Stock对象的hashCode()并想知道为什么两个相同库存对象最终会出现在不同的减速器上

  • KeyPartitioner – 你需要Math.abs(..)的结果。 此时,此值可能会返回负值,当您使用减速器数量模数时,将返回负数。 连锁效应是MR框架将引发exception,因为它期望0(包括)和减速器数量(不包括)之间的数字。 这可能是你问题所在,我将在下一点解释

  • Mapper.map方法 – 当您调用context.collect时,您正在吞咽任何潜在的输出exception。 继续我之前关于分区程序的观点 – 如果它返回一个负数,将抛出一个exception,你需要处理它。 在某些情况下,捕获和吞咽exception可能是正常的(例如输入记录的数据validation),但是输出时发生的任何exception都应抛出到MR框架以标记出错并且此映射器的输出错误/不完整:

     try { context.collect(key, val); } catch (IOException e) { e.printStackTrace(); } 
  • 最后,您需要显式声明您的map并减少输出类型(这会导致exception,因为您当前将map值输出类型声明为Text,而实际上mapper正在输出DoubleWritable):

job.setMapOutputKeyClass(StockKey.class); job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);

我建议您删除context.collect调用周围的try / catch块并重新运行您的作业(或者只检查映射任务的日志,看看是否看到堆栈跟踪)。