如何在hadoop hdfs中列出目录及其子目录中的所有文件

我在hdfs中有一个文件夹,它有两个子文件夹,每个子文件夹有大约30个子文件夹,最后每个子文件夹包含xml文件。 我想列出所有xml文件,只给出主文件夹的路径。 在本地我可以使用apache commons-io的 FileUtils.listFiles()来做到这一点。 我试过这个

FileStatus[] status = fs.listStatus( new Path( args[ 0 ] ) ); 

但它只列出了两个第一个子文件夹,它不会更进一步。 在hadoop有没有办法做到这一点?

您需要使用FileSystem对象并对生成的FileStatus对象执行一些逻辑,以手动递归到子目录中。

您还可以应用PathFilter仅使用listStatus(Path,PathFilter)方法返回xml文件

hadoop FsShell类为hadoop fs -lsr命令提供了这方面的示例,这是一个递归ls – 参见源代码 ,在590行左右(递归步骤在635行触发)

如果您使用的是hadoop 2. * API,那么有更优雅的解决方案:

  Configuration conf = getConf(); Job job = Job.getInstance(conf); FileSystem fs = FileSystem.get(conf); //the second boolean parameter here sets the recursion to true RemoteIterator fileStatusListIterator = fs.listFiles( new Path("path/to/lib"), true); while(fileStatusListIterator.hasNext()){ LocatedFileStatus fileStatus = fileStatusListIterator.next(); //do stuff with the file like ... job.addFileToClassPath(fileStatus.getPath()); } 

你试过这个:

 import java.io.*; import java.util.*; import java.net.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class cat{ public static void main (String [] args) throws Exception{ try{ FileSystem fs = FileSystem.get(new Configuration()); FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in")); // you need to pass in your hdfs path for (int i=0;i 
 /** * @param filePath * @param fs * @return list of absolute file path present in given path * @throws FileNotFoundException * @throws IOException */ public static List getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { List fileList = new ArrayList(); FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { if (fileStat.isDirectory()) { fileList.addAll(getAllFilePath(fileStat.getPath(), fs)); } else { fileList.add(fileStat.getPath().toString()); } } return fileList; } 

快速示例:假设您具有以下文件结构:

 a -> b -> c -> d -> e -> d -> f 

使用上面的代码,您将获得:

 a/b a/c/d a/c/e a/d/f 

如果只想要叶子(即fileNames),请在else块中使用以下代码:

  ... } else { String fileName = fileStat.getPath().toString(); fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1)); } 

这将给出:

 b d e f 

这是一个代码片段,它计算特定HDFS目录中的文件数(我用它来确定在特定ETL代码中使用多少个reducer)。 您可以轻松修改它以满足您的需求。

 private int calculateNumberOfReducers(String input) throws IOException { int numberOfReducers = 0; Path inputPath = new Path(input); FileSystem fs = inputPath.getFileSystem(getConf()); FileStatus[] statuses = fs.globStatus(inputPath); for(FileStatus status: statuses) { if(status.isDirectory()) { numberOfReducers += getNumberOfInputFiles(status, fs); } else if(status.isFile()) { numberOfReducers ++; } } return numberOfReducers; } /** * Recursively determines number of input files in an HDFS directory * * @param status instance of FileStatus * @param fs instance of FileSystem * @return number of input files within particular HDFS directory * @throws IOException */ private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException { int inputFileCount = 0; if(status.isDirectory()) { FileStatus[] files = fs.listStatus(status.getPath()); for(FileStatus file: files) { inputFileCount += getNumberOfInputFiles(file, fs); } } else { inputFileCount ++; } return inputFileCount; } 

感谢Radu Adrian Moldovan提出的建议。

这是使用队列的实现:

 private static List listAllFilePath(Path hdfsFilePath, FileSystem fs) throws FileNotFoundException, IOException { List filePathList = new ArrayList(); Queue fileQueue = new LinkedList(); fileQueue.add(hdfsFilePath); while (!fileQueue.isEmpty()) { Path filePath = fileQueue.remove(); if (fs.isFile(filePath)) { filePathList.add(filePath.toString()); } else { FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { fileQueue.add(fileStat.getPath()); } } } return filePathList; } 

现在,人们可以使用Spark来做同样的事情并且比其他方法(例如Hadoop MR)更快。 这是代码片段。

 def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) { val files = FileSystem.get( sparkContext.hadoopConfiguration ).listStatus(new Path(filePath)) files.foreach { fileStatus => { if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) { filePaths+=fileStatus.getPath().toString() } else if(fileStatus.isDirectory()) { traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths) } } } } 

不要使用递归方法(堆问题):)使用队列

 queue.add(param_dir) while (queue is not empty){ directory= queue.pop - get items from current directory - if item is file add to a list (final list) - if item is directory => queue.push } 

这很容易,享受!

递归和非递归方法的代码片段:

 //helper method to get the list of files from the HDFS path public static List listFilesFromHDFSPath(Configuration hadoopConfiguration, String hdfsPath, boolean recursive) throws IOException, IllegalArgumentException { //resulting list of files List filePaths = new ArrayList(); //get path from string and then the filesystem Path path = new Path(hdfsPath); //throws IllegalArgumentException FileSystem fs = path.getFileSystem(hadoopConfiguration); //if recursive approach is requested if(recursive) { //(heap issues with recursive approach) => using a queue Queue fileQueue = new LinkedList(); //add the obtained path to the queue fileQueue.add(path); //while the fileQueue is not empty while (!fileQueue.isEmpty()) { //get the file path from queue Path filePath = fileQueue.remove(); //filePath refers to a file if (fs.isFile(filePath)) { filePaths.add(filePath.toString()); } else //else filePath refers to a directory { //list paths in the directory and add to the queue FileStatus[] fileStatuses = fs.listStatus(filePath); for (FileStatus fileStatus : fileStatuses) { fileQueue.add(fileStatus.getPath()); } // for } // else } // while } // if else //non-recursive approach => no heap overhead { //if the given hdfsPath is actually directory if(fs.isDirectory(path)) { FileStatus[] fileStatuses = fs.listStatus(path); //loop all file statuses for(FileStatus fileStatus : fileStatuses) { //if the given status is a file, then update the resulting list if(fileStatus.isFile()) filePaths.add(fileStatus.getPath().toString()); } // for } // if else //it is a file then { //return the one and only file path to the resulting list filePaths.add(path.toString()); } // else } // else //close filesystem; no more operations fs.close(); //return the resulting list return filePaths; } // listFilesFromHDFSPath