如何在Hadoop 3.0中进行CopyMerge?
我知道hadoop
版本2.7
的copyMerge
有copyMerge
函数,它将多个文件合并为一个新文件。
但是3.0
版本中的API不再支持copyMerge
函数。
有关如何将目录中的所有文件合并到3.0
版本的hadoop中的新单个文件的任何想法?
FileUtil#copyMerge方法已被删除。 查看主要更改的详细信息:
https://issues.apache.org/jira/browse/HADOOP-12967
https://issues.apache.org/jira/browse/HADOOP-11392
你可以使用getmerge
用法:hadoop fs -getmerge [-nl]
将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件。 可选地,-nl可以设置为在每个文件的末尾添加换行符(LF)。 -skip-empty-file可用于在空文件的情况下避免不需要的换行符。
例子:
hadoop fs -getmerge -nl /src /opt/output.txt hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
退出代码:成功时返回0,错误时返回非零。
我有同样的问题,不得不重新实现copyMerge(虽然在PySpark中,但使用与原始copyMerge相同的API调用)。
不知道为什么在Hadoop 3中没有相同的function。我们必须经常将HDFS目录中的文件合并到HDFS文件中。
这是我在上面引用的pySpark中的实现https://github.com/Tagar/stuff/blob/master/copyMerge.py
由于FileUtil.copyMerge()
已被弃用并从API开始版本3中删除,因此一个简单的解决方案就是自己重新实现它。
这是以前版本的Java原始实现。
这是一个Scala重写:
import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.IOUtils import java.io.IOException def copyMerge( srcFS: FileSystem, srcDir: Path, dstFS: FileSystem, dstFile: Path, deleteSource: Boolean, conf: Configuration ): Boolean = { if (dstFS.exists(dstFile)) throw new IOException(s"Target $dstFile already exists") // Source path is expected to be a directory: if (srcFS.getFileStatus(srcDir).isDirectory()) { val outputFile = dstFS.create(dstFile) Try { srcFS .listStatus(srcDir) .sortBy(_.getPath.getName) .collect { case status if status.isFile() => val inputFile = srcFS.open(status.getPath()) Try(IOUtils.copyBytes(inputFile, outputFile, conf, false)) inputFile.close() } } outputFile.close() if (deleteSource) srcFS.delete(srcDir, true) else true } else false }
- 为什么YARN java堆空间内存错误?
- Hadoop – 直接从Mapper写入HBase
- 在HADOOP地图中使用generics可以减少问题
- 如何使用java api直接发送hbase shell命令,如jdbc?
- Mapreduce wordcount作业中找不到类的exception
- Hadoop 1.2.1 – 多节点集群 – 对于Wordcount程序,Reducer阶段是否挂起?
- 使用MultithreadMapper替换Mapper时,键入地图中的键不匹配
- 如何设置一个reducer来发送和一个mapper来接收?
- 使用saveAsTextFile的Spark NullPointerException