Mapfile作为MapReduce作业的输入

我最近开始使用Hadoop,我在使用Mapfile作为MapReduce作业的输入时遇到了问题。

下面的工作代码在hdfs中编写了一个名为“TestMap”的简单MapFile,其中有三个Text类型的键和三个类型为BytesWritable的值。

这里是TestMap的内容:

$ hadoop fs -text /user/hadoop/TestMap/data 11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library 11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor A 01 B 02 C 03 

这是创建TestMap Mapfile的程序:

 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; public class CreateMap { public static void main(String[] args) throws IOException{ Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Text key = new Text(); BytesWritable value = new BytesWritable(); byte[] data = {1, 2, 3}; String[] strs = {"A", "B", "C"}; int bytesRead; MapFile.Writer writer = null; writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass()); try { for (int i = 0; i < 3; i++) { key.set(strs[i]); value.set(data, i, 1); writer.append(key, value); System.out.println(strs[i] + ":" + data[i] + " added."); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(writer); } } } 

后面的简单MapReduce作业尝试将mapfile的值递增1:

 import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.BytesWritable; public class AddOne extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper { public void map(Text key, BytesWritable value, OutputCollector output, Reporter reporter) throws IOException { byte[] data = value.getBytes(); data[0] += 1; value.set(data, 0, 1); output.collect(key, new Text(value.toString())); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { output.collect(key, values.next()); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, AddOne.class); Path in = new Path("TestMap"); Path out = new Path("output"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("AddOne"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(SequenceFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line", ":"); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new AddOne(), args); System.exit(res); } } 

我得到的运行时exception是:

 java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable at AddOne$MapClass.map(AddOne.java:32) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) 

我不明白为什么hadoop试图强制转换LongWritable,因为在我的代码中我正确定义了Mapper接口( Mapper )。

有人能帮助我吗?

非常感谢你

卢卡

你的问题来自这样一个事实:尽管名称告诉你, MapFile 不是一个文件。

MapFile实际上是一个由两个文件组成的目录:有一个“数据”文件,它是一个包含你写入的键和值的SequenceFile ; 但是,还有一个“索引”文件,它是一个不同的SequenceFile,包含键的子序列及其作为LongWritables的偏移量; 此索引由MapFile.Reader加载到内存中,以便您快速进行二进制搜索,以便在数据文件中查找具有随机访问时所需数据的偏移量。

您正在使用SequenceFileInputFormat的旧“org.apache.hadoop.mapred”版本 。 当你告诉它将MapFile作为输入时,知道只看数据文件是不够聪明的。 相反,它实际上尝试将数据文件索引文件用作常规输入文件。 数据文件将正常工作,因为类与您指定的内容一致,但索引文件将抛出ClassCastException,因为索引文件值都是LongWritables。

您有两个选择:您可以开始使用SequenceFileInputFormat的“org.apache.hadoop.mapreduce”版本 (从而更改代码的其他部分),它对MapFiles有足够的了解,只需查看数据文件; 或者,您可以显式地将数据文件作为您想要输入的文件。

其中一种方法可能是使用自定义InputFormat,其中一个记录用于整个MapFile块,并通过map()中的键进行查找

我使用KeyValueTextInputFormat.class解决了同样的问题

已经提到了整个方法

http://sanketraut.blogspot.in/2012/06/hadoop-example-setting-up-hadoop-on.html