Tag: hdfs

Hadoop HDFS MapReduce输出到MongoDb

我想编写Java程序,它从HDFS读取输入,使用MapReduce处理它并将输出写入MongoDb。 这是场景: 我有一个Hadoop集群,它有3个数据节点。 java程序从HDFS读取输入,使用MapReduce处理它。 最后,将结果写入MongoDb。 实际上,从HDFS读取并使用MapReduce处理它很简单。 但我对将结果写入MongoDb感到困惑。 是否支持将Java API写入MongoDB? 另一个问题是,由于它是一个Hadoop集群,所以我们不知道哪个datanode将运行Reducer任务并生成结果,是否可以将结果写入安装在特定服务器上的MongoDb? 如果我想将结果写入HDFS,代码将如下所示: @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(new Text(key), new LongWritable(sum)); } 现在我想将结果写入MongoDb而不是HDFS,我该怎么做?

基于HADOOP_HOME自动加载HDFS配置?

我正在开发一个Java程序来与已经运行的hadoop集群进行交互。 该程序将HADOOP_HOME作为环境变量传递给它。 基于此值,我需要在开始与HDFS / MapReduce交互之前加载所有必需的配置资源。 我认为我需要的文件基于apache文档 。 我目前的解决方案如下: final String HADOOP_HOME = System.getEnv(“HADOOP_HOME”); Configuration conf = new Configuration(); conf.addResource(new Path(HADOOP_HOME, “src/core/core-default.xml”)); conf.addResource(new Path(HADOOP_HOME, “src/hdfs/hdfs-default.xml”)); conf.addResource(new Path(HADOOP_HOME, “src/mapred/mapred-default.xml”)); conf.addResource(new Path(HADOOP_HOME, “conf/core-site.xml”)); conf.addResource(new Path(HADOOP_HOME, “conf/hdfs-site.xml”)); conf.addResource(new Path(HADOOP_HOME, “conf/mapred-site.xml”)); FileSystem hdfs = new FileSystem(conf); 有更清洁的方法吗? 希望这种方式不涉及明确设置每个资源?

使用mapPartition和迭代器保存spark RDD

我有一些中间数据,我需要存储在HDFS和本地。 我正在使用Spark 1.6。 在HDFS中作为中间forms我在/output/testDummy/part-00000和/output/testDummy/part-00001获取数据。 我想使用Java / Scala将这些分区保存在本地,以便我可以将它们保存为/users/home/indexes/index.nt (通过在本地合并)或/users/home/indexes/index-0000.nt和/home/indexes/index-0001.nt分开。 这是我的代码:注意:testDummy与test相同,输出有两个分区。 我想单独存储它们或组合它们但是本地与index.nt文件。 我更喜欢分别存储在两个数据节点中。 我正在使用集群并在YARN上提交spark工作。 我还添加了一些评论,多少次以及我得到的数据。 我该怎么办? 任何帮助表示赞赏。 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+”/testDummy”) println(“testDummy done”) //1 time print def savesData(iterator: Iterator[(String)]): Iterator[(String)] = { println(“Inside savesData”) // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2 println(“iter size”+iterator.size) // 2 735 2 735 values val filenamesWithExtension = outputPath + “/index.nt” println(“filenamesWithExtension “+filenamesWithExtension.length) //4 […]

只有在完全写入和关闭后才能从HDFS读取文件

我有两个进程在运行。 一种是将文件写入HDFS,另一种是加载这些文件。 第一个进程(写入文件的进程)正在使用: private void writeFileToHdfs(byte[] sourceStream, Path outFilePath) { FSDataOutputStream out = null; try { // create the file out = getFileSystem().create(outFilePath); out.write(sourceStream); } catch (Exception e) { LOG.error(“Error while trying to write a file to hdfs”, e); } finally { try { if (null != out) out.close(); } catch (IOException e) { LOG.error(“Could […]

在Spark 0.9.0上运行作业会引发错误

我安装了Apache Spark 0.9.0群集,我正在尝试部署从HDFS读取文件的代码。 这段代码会发出警告,最终失败。 这是代码 /** * running the code would fail * with a warning * Initial job has not accepted any resources; check your cluster UI to ensure that * workers are registered and have sufficient memory */ object Main extends App { val sconf = new SparkConf() .setMaster(“spark://labscs1:7077”) .setAppName(“spark scala”) val sctx […]

保存Hadoop中的Mapper输出的位置?

我有兴趣有效地管理Hadoop混洗流量并有效利用网络带宽。 为此,我想知道每个Datanode产生的混乱流量是多少? 洗牌流量只不过是映射器的输出。 那么这个映射器输出保存在哪里? 如何实时获取每个数据节点的映射器输出大小? 感谢您的帮助。 我已经创建了一个目录来存储这个mapper输出,如下所示。 mapred.local.dir /app/hadoop/tmp/myoutput 我看着 hduser@dn4:/app/hadoop/tmp/myoutput$ ls -lrt total 16 drwxr-xr-x 2 hduser hadoop 4096 Dec 12 10:50 tt_log_tmp drwx—— 3 hduser hadoop 4096 Dec 12 10:53 ttprivate drwxr-xr-x 3 hduser hadoop 4096 Dec 12 10:53 taskTracker drwxr-xr-x 4 hduser hadoop 4096 Dec 12 13:25 userlogs 当我运行mapreduce工作时,我无法在这里找到任何东西。 谢谢

如何将Jar文件传递给OOZIE shell节点中的shell脚本

嗨我在脚本中运行java程序时遇到错误,该脚本正在oozie shell action workflow中执行。 Stdoutput 2015-08-25 03:36:02,636 INFO [pool-1-thread-1] (ProcessExecute.java:68) – Exception in thread “main” java.io.IOException: Error opening job jar: /tmp/jars/first.jar Stdoutput 2015-08-25 03:36:02,636 INFO [pool-1-thread-1] (ProcessExecute.java:68) – at org.apache.hadoop.util.RunJar.main(RunJar.java:124) Stdoutput 2015-08-25 03:36:02,636 INFO [pool-1-thread-1] (ProcessExecute.java:68) – Caused by: java.io.FileNotFoundException: /tmp/jars/first.jar (No such file or directory) Stdoutput 2015-08-25 03:36:02,636 INFO [pool-1-thread-1] (ProcessExecute.java:68) – at java.util.zip.ZipFile.open(Native […]

Datanode守护程序未在Hadoop 2.5.0上运行

我在一台机器上设置Hadoop 2.5.0,我遇到的问题是没有运行的datanode,如jps命令的输出所示: $ jps 3404 Jps 2661 NodeManager 2606 ResourceManager 2484 NameNode 当我尝试手动运行它时我得到了这个: $HADOOP_HOME/sbin/hadoop-daemon.sh start datanode starting datanode, logging to /home/arbi/Programs/hadoop-2.5.0/logs/hadoop-arbi-datanode-ElOued.out 然后仍然没有,这里是hadoop-arbi-datanode-ElOued.out : ulimit -a for user arbi core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 15862 max locked […]

Hadoop:如何将reducer输出合并到一个文件?

我知道shell中的“getmerge”命令可以完成这项工作。 但是,如果我想在作业之后通过HDFS API for java合并这些输出,我该怎么办? 我真正想要的是HDFS上的单个合并文件。 我唯一能想到的就是在那之后开始一项额外的工作。 谢谢!

无法使用本地hadoop连接azure blob存储

在尝试使用Hadoop版本2.7.1将本地hadoop与AZURE BLOB存储(即使用blob存储作为HDFS )连接时,它会抛出exception 在这里,我通过设置属性成功地形成了本地群集 fs.default.name wasb://account@storage.blob.core.windows.net 然后是core-site.xml中blob存储的关键值。 列出文件或对blob存储进行HDFS操作时 ,将以下exception作为 ls: No FileSystem for scheme: wasb 有人请指导我解决上述问题。