使用MapReduce中的globStatus过滤输入文件
我有很多输入文件,我想根据最后附加的日期处理选定的文件。 我现在很困惑我在哪里使用globStatus方法来过滤掉文件。
我有一个自定义的RecordReader类,我试图在其下一个方法中使用globStatus,但它没有成功。
public boolean next(Text key, Text value) throws IOException { Path filePath = fileSplit.getPath(); if (!processed) { key.set(filePath.getName()); byte[] contents = new byte[(int) fileSplit.getLength()]; value.clear(); FileSystem fs = filePath.getFileSystem(conf); fs.globStatus(new Path("/*" + date)); FSDataInputStream in = null; try { in = fs.open(filePath); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; }
我知道它返回一个FileStatus数组,但我如何使用它来过滤文件。 有人可以请一些亮点吗?
globStatus
方法有2个免费参数,允许您过滤文件。 第一个是glob模式,但有时glob模式不足以过滤特定文件,在这种情况下,您可以定义PathFilter
。
关于glob模式,支持以下内容:
Glob | Matches ------------------------------------------------------------------------------------------------------------------- * | Matches zero or more characters ? | Matches a single character [ab] | Matches a single character in the set {a, b} [^ab] | Matches a single character not in the set {a, b} [ab] | Matches a single character in the range [a, b] where a is lexicographically less than or equal to b [^ab] | Matches a single character not in the range [a, b] where a is lexicographically less than or equal to b {a,b} | Matches either expression a or b \c | Matches character c when it is a metacharacter
PathFilter
只是一个这样的接口:
public interface PathFilter { boolean accept(Path path); }
因此,您可以实现此接口并实现accept
方法,您可以将逻辑放在过滤文件中。
Tom White的优秀书籍中的一个示例,它允许您定义PathFilter
以过滤与特定正则表达式匹配的文件:
public class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { return !path.toString().matches(regex); } }
您可以在初始化作业时通过调用FileInputFormat.setInputPathFilter(JobConf, RegexExcludePathFilter.class)
直接使用PathFilter
实现过滤输入。
编辑 :由于你必须在setInputPathFilter
传递类,你不能直接传递参数,但你应该能够通过使用Configuration
来做类似的事情。 如果使RegexExcludePathFilter
也从Configured
扩展,您可以使用所需的值返回一个之前已初始化的Configuration
对象,这样您就可以在filter中找回这些值并在accept
处理它们。
例如,如果您初始化如下:
conf.set("date", "2013-01-15");
然后你可以像这样定义你的filter:
public class RegexIncludePathFilter extends Configured implements PathFilter { private String date; private FileSystem fs; public boolean accept(Path path) { try { if (fs.isDirectory(path)) { return true; } } catch (IOException e) {} return path.toString().endsWith(date); } public void setConf(Configuration conf) { if (null != conf) { this.date = conf.get("date"); try { this.fs = FileSystem.get(conf); } catch (IOException e) {} } } }
编辑2 :原始代码存在一些问题,请参阅更新的类。 您还需要删除构造函数,因为它不再使用,并检查是否是一个目录,在这种情况下您应该返回true,以便也可以过滤目录的内容。
对于阅读此内容的任何人,我可以说“请不要在filter中做任何比validation路径更复杂的事情”。 具体来说:不要检查作为目录的文件,获取它们的大小等。等待列表/ glob操作返回,然后使用填充的FileStatus
条目中的信息在那里进行过滤。
为什么? 所有这些直接或通过isDirectory()
调用getFileStatus()
对文件系统进行不必要的调用,这些调用会在HDFS集群上添加不必要的namenode负载。 更重要的是,针对S3和其他对象存储,每个操作都可能发出多个HTTPS请求 – 这些确实需要花费可测量的时间。 更好的是,如果S3认为您在整个机器群集中发出过多请求,S3会限制您。 你不希望这样。
直到调用之后 – 你得到的文件状态条目是来自对象存储列表命令的那些,它通常每个HTTPS请求返回数千个文件条目,因此效率更高。
有关更多详细信息,请检查org.apache.hadoop.fs.s3a.S3AFileSystem
的源代码。