数据集-API模拟JavaSparkContext.wholeTextFiles
我们可以调用JavaSparkContext.wholeTextFiles
并获取JavaPairRDD
,其中第一个String是文件名,第二个String是整个文件内容。 在Dataset API中是否有类似的方法,或者我所能做的就是将文件加载到JavaPairRDD
然后转换为Dataset(这是有效的,但我正在寻找非RDD解决方案)。
如果要使用数据集API,则可以使用spark.read.text("path/to/files/")
。 请在此处查看API详细信息。 请注意,使用text()
方法返回Dataframe,其中“ 文本文件中的每一行都是生成的DataFrame中的新行 ”。 所以text()
方法将提供文件内容。 要获取文件名,您必须使用input_file_name()
函数。
import static org.apache.spark.sql.functions.input_file_name; Dataset ds = spark.read().text("c:\\temp").withColumnRenamed("value", "content").withColumn("fileName", input_file_name()); ds.show(false);
如果要连接同一文件中的行,使其像整个文件内容一样,则需要在fileName列上使用groupBy
函数和concat_ws
和collect_list
函数。
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.concat_ws; import static org.apache.spark.sql.functions.collect_list; ds = ds.groupBy(col("fileName")).agg(concat_ws("",collect_list(ds.col("content"))).as("content")); ds.show(false);
- 在Apache spark中,使用mapPartitions和组合使用广播变量和map之间的区别是什么
- Spark SQL:镶嵌错误的嵌套类
- 为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”?
- 如何使用Java有效地读取Hadoop(HDFS)文件中的第一行?
- Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行
- 如何从sparkdataframe列中的数组中提取值
- 如何在Spark RDD(Java)中通过索引获取元素
- 如何使用Spark DataFrame计算Cassandra表的汇总统计量?
- java.lang.NoClassDefFoundError:org / apache / spark / Logging