Spark流式传输DStream RDD以获取文件名

Spark流textFileStreamfileStream可以监视目录并处理Dstream RDD中的新文件。

如何在特定时间间隔内获取DStream RDD正在处理的文件名?

fileStream生成UnionRDDNewHadoopRDD 。 由sc.newAPIHadoopFile创建的关于NewHadoopRDDsc.newAPIHadoopFile是它们的name被设置为它们的路径。

以下是您可以使用该知识做的示例:

 def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] = ssc.fileStream[LongWritable, Text, TextInputFormat](directory) .transform( rdd => new UnionRDD(rdd.context, rdd.dependencies.map( dep => dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name) ) ) ) def transformByFile[U: ClassTag](unionrdd: RDD[String], transformFunc: String => RDD[String] => RDD[U]): RDD[U] = { new UnionRDD(unionrdd.context, unionrdd.dependencies.map{ dep => if (dep.rdd.isEmpty) None else { val filename = dep.rdd.name Some( transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]]) .setName(filename) ) } }.flatten ) } def main(args: Array[String]) = { val conf = new SparkConf() .setAppName("Process by file") .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(30)) val dstream = namesTextFileStream(ssc, "/some/directory") def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] = rdd.map(line => (filename, line)) val transformed = dstream. transform(rdd => transformByFile(rdd, byFileTransformer)) // Do some stuff with transformed ssc.start() ssc.awaitTermination() } 

对于那些需要一些Java代码而不是Scala的人:

 JavaPairInputDStream textFileStream = jsc.fileStream( inputPath, LongWritable.class, Text.class, TextInputFormat.class, FileInputDStream::defaultFilter, false ); JavaDStream> namedTextFileStream = textFileStream.transform((pairRdd, time) -> { UnionRDD> rdd = (UnionRDD>) pairRdd.rdd(); List>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava(); List>> collectedRdds = deps.stream().map( depRdd -> { if (depRdd.isEmpty()) { return null; } JavaRDD> depJavaRdd = depRdd.toJavaRDD(); String filename = depRdd.name(); JavaPairRDD newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2(filename, t._2().toString())).setName(filename); return newDep.rdd(); }).filter(t -> t != null).collect(Collectors.toList()); Seq>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq(); ClassTag> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); return new UnionRDD>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD(); });