hadoop中的MultipleOutputFormat

我是Hadoop的新手。 我正在尝试Wordcount程序。

现在尝试多个输出文件,我使用MultipleOutputFormat 。 这个链接帮助我做到了这一点。 http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

在我的司机课上我有

  MultipleOutputs.addNamedOutput(conf, "even", org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(conf, "odd", org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, IntWritable.class);` 

而我的减少课就变成了这个

 public static class Reduce extends MapReduceBase implements Reducer { MultipleOutputs mos = null; public void configure(JobConf job) { mos = new MultipleOutputs(job); } public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } if (sum % 2 == 0) { mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); }else { mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); } //output.collect(key, new IntWritable(sum)); } @Override public void close() throws IOException { // TODO Auto-generated method stub mos.close(); } } 

事情很有效,但我得到了很多文件,(每个地图减少一个奇数和一个偶数)

问题是:我如何只有2个输出文件(奇数和偶数),以便每个map-reduce的每个奇数输出都写入该奇数文件,并且偶数相同。

每个reducer使用OutputFormat将记录写入。 这就是为什么你每个reducer得到一组奇数和偶数文件的原因。 这是设计使得每个reducer可以并行执行写入。

如果你只需要一个奇数和单个偶数文件,你需要将mapred.reduce.tasks设置为1.但是性能会受到影响,因为所有的映射器都会被送入一个reducer。

另一种选择是更改进程读取这些文件以接受多个输入文件,或者编写一个将这些文件合并在一起的单独进程。

我为此写了一堂课。 用它来做你的工作:

 job.setOutputFormatClass(m_customOutputFormatClass); 

这是我的class级:

 import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.
*

* WARNING: The number of different folder shuoldn't be large for one mapper since we keep an * {@link RecordWriter} instance per folder name. *

*

* In this class the folder name is defined by the written entry's key.
* To change this behavior simply extend this class and override the * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own * {@link FolderNameExtractor} implementation. *

* * * @author ykesten * * @param - Keys type * @param - Values type */ public class HdMultipleFileOutputFormat extends TextOutputFormat { private String folderName; private class MultipleFilesRecordWriter extends RecordWriter { private Map> fileNameToWriter; private FolderNameExtractor fileNameExtractor; private TaskAttemptContext job; public MultipleFilesRecordWriter(FolderNameExtractor fileNameExtractor, TaskAttemptContext job) { fileNameToWriter = new HashMap>(); this.fileNameExtractor = fileNameExtractor; this.job = job; } @Override public void write(K key, V value) throws IOException, InterruptedException { String fileName = fileNameExtractor.extractFolderName(key, value); RecordWriter writer = fileNameToWriter.get(fileName); if (writer == null) { writer = createNewWriter(fileName, fileNameToWriter, job); if (writer == null) { throw new IOException("Unable to create writer for path: " + fileName); } } writer.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (Entry> entry : fileNameToWriter.entrySet()) { entry.getValue().close(context); } } } private synchronized RecordWriter createNewWriter(String folderName, Map> fileNameToWriter, TaskAttemptContext job) { try { this.folderName = folderName; RecordWriter writer = super.getRecordWriter(job); this.folderName = null; fileNameToWriter.put(folderName, writer); return writer; } catch (Exception e) { e.printStackTrace(); return null; } } @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { Path path = super.getDefaultWorkFile(context, extension); if (folderName != null) { String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); path = new Path(newPath); } return path; } @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); } public FolderNameExtractor getFolderNameExtractor() { return new KeyFolderNameExtractor(); } public interface FolderNameExtractor { public String extractFolderName(K key, V value); } private static class KeyFolderNameExtractor implements FolderNameExtractor { public String extractFolderName(K key, V value) { return key.toString(); } } }

将根据减速器的数量生成多个输出文件。

您可以使用hadoop dfs -getmerge来合并输出