使用sc.textFile以递归方式从子目录中获取文件内容
似乎SparkContext textFile只希望文件存在于给定的目录位置 – 它也不存在
- (a)递归或
- (b)甚至支持目录(尝试将目录读取为文件)
任何建议如何构造递归 – 可能比手动创建递归文件列表/下降逻辑更简单?
这是用例:文件下
/数据/表/ MY_TABLE
我希望能够通过hdfs调用该父目录下所有目录级别的所有文件。
UPDATE
sc.textFile()通过(子类)TextInputFormat调用Hadoop FileInputFormat。 在逻辑内部存在执行递归目录读取 – 即首先检测条目是否是目录,如果是,则降序:
for (FileStatus globStat: matches) { 218 if (globStat.isDir()) { 219 for(FileStatus stat: fs.listStatus(globStat.getPath(), 220 inputFilter)) { 221 result.add(stat); 222 } 223 } else { 224 result.add(globStat); 225 } 226 }
但是,在调用sc.textFile时,目录条目上存在错误:“not a file”。 这种行为令人困惑 – 因为似乎有适当的支持来处理目录。
我在看旧版的FileInputFormat ..
在设置递归配置之前 mapreduce.input.fileinputformat.input.dir.recursive
scala> sc.textFile("dev/*").count java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
默认值为null / not set,其值为“false”:
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive") res1: String = null
后:
现在设置值:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
现在重试递归操作:
scala>sc.textFile("dev/*/*").count .. res5: Long = 3481 So it works.
@Ben为每条评论添加/ 更新完整递归
我发现必须按以下方式设置这些参数:
.set("spark.hive.mapred.supports.subdirectories","true") .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
- 是否可以在Apache Spark中创建嵌套的RDD?
- httpclient版本与Apache Spark之间的冲突
- 使用带有ScalaObjectMapper的Jackson模块在Spark 1.4.0上运行作业时出错
- Spark Combinebykey JAVA lambda表达式
- 无法找到Web UI的资源路径:org / apache / spark / ui / static创建Spark应用程序时
- Java中Spark MLlib中的矩阵运算
- Spark – 可以在JAVA中将MultiMap转换为DataFrame
- 如何使用JAVA在Spark DataFrame上调用UDF?
- 数据集-API模拟JavaSparkContext.wholeTextFiles