从Spark中的压缩中读取整个文本文件

我有以下问题:假设我有一个包含压缩目录的目录,其中包含存储在HDFS上的多个文件。 我想创建一个包含T类型对象的RDD,即:

context = new JavaSparkContext(conf); JavaPairRDD filesRDD = context.wholeTextFiles(inputDataPath); JavaPairRDD filesRDD = context.wholeTextFiles(inputDataPath); JavaRDD processingFiles = filesRDD.map(fileNameContent -> { // The name of the file String fileName = fileNameContent._1(); // The content of the file String content = fileNameContent._2(); // Class T has a constructor of taking the filename and the content of each // processed file (as two strings) T t = new T(content, fileName); return t; }); 

现在当inputDataPath是一个包含文件的目录时,这个工作完全没问题,即它是这样的:

 String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders 

但是,当有一个包含多个文件的tgz时,文件内容( fileNameContent._2() )会给我一些无用的二进制字符串(非常期待)。 我在SO上发现了一个类似的问题 ,但情况并非相同,因为解决方案是每个压缩只包含一个文件,而在我的情况下,还有许多其他文件我想单独读取整个文件。 我还发现了一个关于wholeTextFiles的问题 ,但这在我的情况下不起作用。

任何想法如何做到这一点?

编辑:

我从这里尝试了读者(尝试从这里测试读者,就像在函数testTarballWithFolders() ),但每当我打电话

 TarballReader tarballReader = new TarballReader(fileName); 

我得到NullPointerException

 java.lang.NullPointerException at java.util.zip.InflaterInputStream.(InflaterInputStream.java:83) at java.util.zip.GZIPInputStream.(GZIPInputStream.java:77) at java.util.zip.GZIPInputStream.(GZIPInputStream.java:91) at utils.TarballReader.(TarballReader.java:61) at main.SparkMain.lambda$0(SparkMain.java:105) at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

MainSpark的第105 MainSpark是我在post的编辑中显示的那一行,而来自TarballReader第61 TarballReader

 GZIPInputStream gzip = new GZIPInputStream(in); 

它为上面的输入流提供了一个空值:

 InputStream in = this.getClass().getResourceAsStream(tarball); 

我在正确的道路上吗? 如果是这样,我该如何继续? 为什么我得到这个空值,我该如何解决?

一种可能的解决方案是使用binaryFiles读取数据并手动提取内容。

斯卡拉

 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.spark.input.PortableDataStream import scala.util.Try import java.nio.charset._ def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) Stream.continually(Option(tar.getNextTarEntry)) // Read until next exntry is null .takeWhile(_.isDefined) // flatten .flatMap(x => x) // Drop directories .filter(!_.isDirectory) .map(e => { Stream.continually { // Read n bytes val buffer = Array.fill[Byte](n)(-1) val i = tar.read(buffer, 0, n) (i, buffer.take(i))} // Take as long as we've read something .takeWhile(_._1 > 0) .map(_._2) .flatten .toArray}) .toArray } def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) sc.binaryFiles("somePath").flatMapValues(x => extractFiles(x).toOption).mapValues(_.map(decode())) 
 libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11" 

Java的完整用法示例: https : //bitbucket.org/zero323/spark-multifile-targz-extract/src

Python

 import tarfile from io import BytesIO def extractFiles(bytes): tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz") return [tar.extractfile(x).read() for x in tar if x.isfile()] (sc.binaryFiles("somePath") .mapValues(extractFiles) .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))