CombineFileInputFormat Hadoop 0.20.205的实现

有人可以指出我在哪里可以找到CombineFileInputFormat的实现(org。使用Hadoop 0.20.205?这是使用EMR从非常小的日志文件(行中文本)创建大分割。

令人惊讶的是,Hadoop没有专门为此目的而制作的这个类的默认实现,并且谷歌搜索看起来我不是唯一一个被此混淆的人。 我需要编译类并将其捆绑在一个jar中,用于hadoop-streaming,对Java的知识有限,这是一个挑战。

编辑:我已经尝试过yetitrails示例,使用了必要的导入,但是我为下一个方法得到了编译器错误。

这是我给你的一个实现:

 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; @SuppressWarnings("deprecation") public class CombinedInputFormat extends CombineFileInputFormat { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); } public static class myCombineFileRecordReader implements RecordReader { private final LineRecordReader linerecord; public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); linerecord = new LineRecordReader(conf, filesplit); } @Override public void close() throws IOException { linerecord.close(); } @Override public LongWritable createKey() { // TODO Auto-generated method stub return linerecord.createKey(); } @Override public Text createValue() { // TODO Auto-generated method stub return linerecord.createValue(); } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return linerecord.getPos(); } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub return linerecord.getProgress(); } @Override public boolean next(LongWritable key, Text value) throws IOException { // TODO Auto-generated method stub return linerecord.next(key, value); } } } 

在您的工作中,首先根据您希望组合输入文件的大小设置参数mapred.max.split.size 。 在run()中执行以下操作

 ... if (argument != null) { conf.set("mapred.max.split.size", argument); } else { conf.set("mapred.max.split.size", "134217728"); // 128 MB } ... conf.setInputFormat(CombinedInputFormat.class); ...