使用MultithreadMapper替换Mapper时,键入地图中的键不匹配

我想为MapReduce作业实现一个MultithreadMapper。

为此,我在一个工作代码中用MultithreadMapper替换了Mapper。

这是我得到的例外:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264) 

这是代码设置:

  public static void main(String[] args) { try { if (args.length != 2) { System.err.println("Usage: MapReduceMain  "); System.exit(123); } Job job = new Job(); job.setJarByClass(MapReduceMain.class); job.setInputFormatClass(TextInputFormat.class); FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration()); FileStatus[] files = fs.listStatus(new Path(args[0])); for(FileStatus sfs:files){ FileInputFormat.addInputPath(job, sfs.getPath()); } FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MyMultithreadMapper.class); job.setReducerClass(MyReducer.class); MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(MyPage.class); job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } 

这是映射器的代码:

 public class MyMultithreadMapper extends MultithreadedMapper { ConcurrentLinkedQueue scrapers = new ConcurrentLinkedQueue(); public static final int nThreads = 5; public MyMultithreadMapper() { for (int i = 0; i < nThreads; i++) { scrapers.add(new MyScraper()); } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { MyScraper scraper = scrapers.poll(); MyPage result = null; for (int i = 0; i < 10; i++) { try { result = scraper.scrapPage(value.toString(), true); break; } catch (Exception e) { e.printStackTrace(); } } if (result == null) { result = new MyPage(); result.setUrl(key.toString()); } context.write(new IntWritable(result.getUrl().hashCode()), result); scrapers.add(scraper); } 

我为什么要这个?

这是必须做的事情:

MultithreadedMapper.setMapperClass(job,MyMapper.class);

MyMapper必须实现map逻辑

MultithreadMapper必须为空