如何在Hadoop 3.0中进行CopyMerge?

我知道hadoop版本2.7copyMergecopyMerge函数,它将多个文件合并为一个新文件。

但是3.0版本中的API不再支持copyMerge函数。

有关如何将目录中的所有文件合并到3.0版本的hadoop中的新单个文件的任何想法?

FileUtil#copyMerge方法已被删除。 查看主要更改的详细信息:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

你可以使用getmerge

用法:hadoop fs -getmerge [-nl]

将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件。 可选地,-nl可以设置为在每个文件的末尾添加换行符(LF)。 -skip-empty-file可用于在空文件的情况下避免不需要的换行符。

例子:

 hadoop fs -getmerge -nl /src /opt/output.txt hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt 

退出代码:成功时返回0,错误时返回非零。

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

我有同样的问题,不得不重新实现copyMerge(虽然在PySpark中,但使用与原始copyMerge相同的API调用)。

不知道为什么在Hadoop 3中没有相同的function。我们必须经常将HDFS目录中的文件合并到HDFS文件中。

这是我在上面引用的pySpark中的实现https://github.com/Tagar/stuff/blob/master/copyMerge.py

由于FileUtil.copyMerge()已被弃用并从API开始版本3中删除,因此一个简单的解决方案就是自己重新实现它。

这是以前版本的Java原始实现。

这是一个Scala重写:

 import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.IOUtils import java.io.IOException def copyMerge( srcFS: FileSystem, srcDir: Path, dstFS: FileSystem, dstFile: Path, deleteSource: Boolean, conf: Configuration ): Boolean = { if (dstFS.exists(dstFile)) throw new IOException(s"Target $dstFile already exists") // Source path is expected to be a directory: if (srcFS.getFileStatus(srcDir).isDirectory()) { val outputFile = dstFS.create(dstFile) Try { srcFS .listStatus(srcDir) .sortBy(_.getPath.getName) .collect { case status if status.isFile() => val inputFile = srcFS.open(status.getPath()) Try(IOUtils.copyBytes(inputFile, outputFile, conf, false)) inputFile.close() } } outputFile.close() if (deleteSource) srcFS.delete(srcDir, true) else true } else false }