数据集-API模拟JavaSparkContext.wholeTextFiles

我们可以调用JavaSparkContext.wholeTextFiles并获取JavaPairRDD ,其中第一个String是文件名,第二个String是整个文件内容。 在Dataset API中是否有类似的方法,或者我所能做的就是将文件加载到JavaPairRDD然后转换为Dataset(这是有效的,但我正在寻找非RDD解决方案)。

如果要使用数据集API,则可以使用spark.read.text("path/to/files/") 。 请在此处查看API详细信息。 请注意,使用text()方法返回Dataframe,其中“ 文本文件中的每一行都是生成的DataFrame中的新行 ”。 所以text()方法将提供文件内容。 要获取文件名,您必须使用input_file_name()函数。

 import static org.apache.spark.sql.functions.input_file_name; Dataset ds = spark.read().text("c:\\temp").withColumnRenamed("value", "content").withColumn("fileName", input_file_name()); ds.show(false); 

如果要连接同一文件中的行,使其像整个文件内容一样,则需要在fileName列上使用groupBy函数和concat_wscollect_list函数。

 import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.concat_ws; import static org.apache.spark.sql.functions.collect_list; ds = ds.groupBy(col("fileName")).agg(concat_ws("",collect_list(ds.col("content"))).as("content")); ds.show(false);