使用mapPartition和迭代器保存spark RDD

我有一些中间数据,我需要存储在HDFS和本地。 我正在使用Spark 1.6。 在HDFS中作为中间forms我在/output/testDummy/part-00000/output/testDummy/part-00001获取数据。 我想使用Java / Scala将这些分区保存在本地,以便我可以将它们保存为/users/home/indexes/index.nt (通过在本地合并)或/users/home/indexes/index-0000.nt/home/indexes/index-0001.nt分开。

这是我的代码:注意:testDummy与test相同,输出有两个分区。 我想单独存储它们或组合它们但是本地与index.nt文件。 我更喜欢分别存储在两个数据节点中。 我正在使用集群并在YARN上提交spark工作。 我还添加了一些评论,多少次以及我得到的数据。 我该怎么办? 任何帮助表示赞赏。

  val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy") println("testDummy done") //1 time print def savesData(iterator: Iterator[(String)]): Iterator[(String)] = { println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2 println("iter size"+iterator.size) // 2 735 2 735 values val filenamesWithExtension = outputPath + "/index.nt" println("filenamesWithExtension "+filenamesWithExtension.length) //4 times var list = List[(String)]() val fileWritter = new FileWriter(filenamesWithExtension,true) val bufferWritter = new BufferedWriter(fileWritter) while (iterator.hasNext){ //iterator.hasNext is false println("inside iterator") //0 times val dat = iterator.next() println("datadata "+iterator.next()) bufferWritter.write(dat + "\n") bufferWritter.flush() println("index files written") val dataElements = dat.split(" ") println("dataElements") //0 list = list.::(dataElements(0)) list = list.::(dataElements(1)) list = list.::(dataElements(2)) } bufferWritter.close() //closing println("savesData method end") //4 times when coal=2 list.iterator } println("before saving data into local") //1 val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData) println("testRDD partitions "+test.getNumPartitions) //2 println("testRDD size "+test.collect().length) //0 println("after saving data into local") //1 

PS:我跟着, 这个和这个但不完全相同我正在寻找,我做了某种方式但没有得到任何index.nt

有几件事:

  • 如果您打算稍后使用数据,请不要调用Iterator.sizeIteratorsTraversableOnce 。 计算Iterator大小的唯一方法是遍历其所有元素,之后不再需要读取数据。
  • 不要将mapPartitionsmapPartitions用于副作用。 如果要执行某些类型的IO使用操作,例如foreach / foreachPartition 。 这是一种不好的做法,并不保证给定的代码片段只执行一次。
  • 动作或转换中的本地路径是特定工作者的本地路径。 如果要直接在客户端计算机上编写,则应首先使用collecttoLocalIterator获取数据。 尽管写入分布式存储并稍后获取数据可能会更好。

Java 7提供了查看目录的方法。

https://docs.oracle.com/javase/tutorial/essential/io/notification.html

我们的想法是创建一个监视服务,将其注册到感兴趣的目录(提及您感兴趣的事件,如文件创建,删除等),观察,您将收到任何事件的通知,如创建,删除,等等,你可以采取你想要的任何行动。

只要适用,您将不得不严重依赖Java hdfs api。

在后台运行程序,因为它永远等待事件。 (你可以在你做任何你想做的事后写逻辑来退出)

另一方面,shell脚本也会有所帮助。

在读取文件时要注意hdfs文件系统的一致性模型。

希望这有助于一些想法。