使用MapReduce中的globStatus过滤输入文件

我有很多输入文件,我想根据最后附加的日期处理选定的文件。 我现在很困惑我在哪里使用globStatus方法来过滤掉文件。

我有一个自定义的RecordReader类,我试图在其下一个方法中使用globStatus,但它没有成功。

public boolean next(Text key, Text value) throws IOException { Path filePath = fileSplit.getPath(); if (!processed) { key.set(filePath.getName()); byte[] contents = new byte[(int) fileSplit.getLength()]; value.clear(); FileSystem fs = filePath.getFileSystem(conf); fs.globStatus(new Path("/*" + date)); FSDataInputStream in = null; try { in = fs.open(filePath); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } 

我知道它返回一个FileStatus数组,但我如何使用它来过滤文件。 有人可以请一些亮点吗?

globStatus方法有2个免费参数,允许您过滤文件。 第一个是glob模式,但有时glob模式不足以过滤特定文件,在这种情况下,您可以定义PathFilter

关于glob模式,支持以下内容:

 Glob | Matches ------------------------------------------------------------------------------------------------------------------- * | Matches zero or more characters ? | Matches a single character [ab] | Matches a single character in the set {a, b} [^ab] | Matches a single character not in the set {a, b} [ab] | Matches a single character in the range [a, b] where a is lexicographically less than or equal to b [^ab] | Matches a single character not in the range [a, b] where a is lexicographically less than or equal to b {a,b} | Matches either expression a or b \c | Matches character c when it is a metacharacter 

PathFilter只是一个这样的接口:

 public interface PathFilter { boolean accept(Path path); } 

因此,您可以实现此接口并实现accept方法,您可以将逻辑放在过滤文件中。

Tom White的优秀书籍中的一个示例,它允许您定义PathFilter以过滤与特定正则表达式匹配的文件:

 public class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { return !path.toString().matches(regex); } } 

您可以在初始化作业时通过调用FileInputFormat.setInputPathFilter(JobConf, RegexExcludePathFilter.class)直接使用PathFilter实现过滤输入。

编辑 :由于你必须在setInputPathFilter传递类,你不能直接传递参数,但你应该能够通过使用Configuration来做类似的事情。 如果使RegexExcludePathFilter也从Configured扩展,您可以使用所需的值返回一个之前已初始化的Configuration对象,这样您就可以在filter中找回这些值并在accept处理它们。

例如,如果您初始化如下:

 conf.set("date", "2013-01-15"); 

然后你可以像这样定义你的filter:

 public class RegexIncludePathFilter extends Configured implements PathFilter { private String date; private FileSystem fs; public boolean accept(Path path) { try { if (fs.isDirectory(path)) { return true; } } catch (IOException e) {} return path.toString().endsWith(date); } public void setConf(Configuration conf) { if (null != conf) { this.date = conf.get("date"); try { this.fs = FileSystem.get(conf); } catch (IOException e) {} } } } 

编辑2 :原始代码存在一些问题,请参阅更新的类。 您还需要删除构造函数,因为它不再使用,并检查是否是一个目录,在这种情况下您应该返回true,以便也可以过滤目录的内容。

对于阅读此内容的任何人,我可以说“请不要在filter中做任何比validation路径更复杂的事情”。 具体来说:不要检查作为目录的文件,获取它们的大小等。等待列表/ glob操作返回,然后使用填充的FileStatus条目中的信息在那里进行过滤。

为什么? 所有这些直接或通过isDirectory()调用getFileStatus()对文件系统进行不必要的调用,这些调用会在HDFS集群上添加不必要的namenode负载。 更重要的是,针对S3和其他对象存储,每个操作都可能发出多个HTTPS请求 – 这些确实需要花费可测量的时间。 更好的是,如果S3认为您在整个机器群集中发出过多请求,S3会限制您。 你不希望这样。

直到调用之后 – 你得到的文件状态条目是来自对象存储列表命令的那些,它通常每个HTTPS请求返回数千个文件条目,因此效率更高。

有关更多详细信息,请检查org.apache.hadoop.fs.s3a.S3AFileSystem的源代码。