hadoop方法将输出发送到多个目录

我的MapReduce作业按日期处理数据,需要将输出写入某个文件夹结构。 目前的期望是产生以下结构:

 2013 01 02 .. 2012 01 02 .. 

等等

在任何时候,我只获得长达12个月的数据,因此,我使用MultipleOutputs类在驱动程序中使用以下函数创建12个输出:

 public void createOutputs(){ Calendar c = Calendar.getInstance(); String monthStr, pathStr; // Create multiple outputs for last 12 months // TODO make 12 configurable for(int i = 0; i  10 ? "" + month : "0" + month ; // Generate path string in the format 2013/03/etl pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl"; // Add the named output MultipleOutputs.addNamedOutput(config, pathStr ); // Move to previous month c.add(Calendar.MONTH, -1); } } 

在reducer中,我添加了一个清理函数来将生成的输出移动到适当的目录。

 protected void cleanup(Context context) throws IOException, InterruptedException { // Custom function to recursively process data moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath")); } 

问题:在将输出从_temporary目录移动到输出目录之前,正在执行reducer的清除function。 因此,上述function在执行时看不到任何输出,因为所有数据仍在_temporary目录中。

对我来说,实现所需function的最佳方式是什么? 欣赏任何见解。

考虑以下事项:

  • 有没有办法使用自定义outputcommitter?
  • 是否更好地将另一份工作联系在一起,或者这样做是否有点过分?
  • 是否有一个更简单的替代方案,我只是不知道..

以下是cleanupfunction的文件结构示例日志:

 MyMapReduce: filepath:hdfs://localhost:8020/dev/test MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000 

你不应该需要第二份工作。 我目前正在使用MultipleOutputs在我的一个程序中创建大量的输出目录。 尽管有超过30个目录,但我只能使用几个MultipleOutputs对象。 这是因为您可以在编写时设置输出目录,因此只能在需要时确定。 如果要以不同的格式输出,实际上只需要多个namedOutput(例如,一个带键:Text.class,value:Text.class,一个带键:Text.class和Value:IntWritable.class)

建立:

 MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class); 

减速机的设置:

 mout = new MultipleOutputs(context); 

在reducer中调用mout:

 String key; //set to whatever output key will be String value; //set to whatever output value will be String outputFileName; //set to absolute path to file where this should write mout.write("Output",new Text(key),new Text(value),outputFileName); 

您可以在编码时使用一段代码确定目录。 例如,假设您要按月和年指定目录:

 int year;//extract year from data int month;//extract month from data String baseFileName; //parent directory to all outputs from this job String outputFileName = baseFileName + "/" + year + "/" + month; mout.write("Output",new Text(key),new Text(value),outputFileName); 

希望这可以帮助。

编辑:以上示例的输出文件结构:

 Base 2013 01 02 03 ... 2012 01 ... ... 

很可能你错过了在清理中关闭mos。

如果你有mapper或reducer中的设置,如下所示:

 public void setup(Context context) {mos = new MultipleOutputs(context);} 

你应该在清理开始时关闭mos,如下所示..

 public void cleanup(Context context ) throws IOException, InterruptedException {mos.close();}