Tag: hadoop

Hadoop HDFS MapReduce输出到MongoDb

我想编写Java程序,它从HDFS读取输入,使用MapReduce处理它并将输出写入MongoDb。 这是场景: 我有一个Hadoop集群,它有3个数据节点。 java程序从HDFS读取输入,使用MapReduce处理它。 最后,将结果写入MongoDb。 实际上,从HDFS读取并使用MapReduce处理它很简单。 但我对将结果写入MongoDb感到困惑。 是否支持将Java API写入MongoDB? 另一个问题是,由于它是一个Hadoop集群,所以我们不知道哪个datanode将运行Reducer任务并生成结果,是否可以将结果写入安装在特定服务器上的MongoDb? 如果我想将结果写入HDFS,代码将如下所示: @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(new Text(key), new LongWritable(sum)); } 现在我想将结果写入MongoDb而不是HDFS,我该怎么做?

ClassNotFoundException在修改后的SimpleShortestPathsVertex上运行GiraphRunner

我对Giraph比较陌生,我正在努力让我的Giraph edit-compile-deploy循环适用于我们的代码。 我能够运行各种灵感来自http://blog.cloudera.com/blog/2014/02/how-to-write-and-run-giraph-jobs-on-hadoop/的例子,但我坚持不懈运行我的SimpleShortestPathsVertex Giraph示例的修改版本时出现ClassNotFoundException。 我已经尝试过-libjars和HADOOP_CLASSPATH的各种组合,但我没有想法,我真的很感谢你的帮助。 细节如下。 版本 Hadoop:Hadoop 2.0.0-cdh4.4.0 Giraph:giraph-examples-1.0.0-for-hadoop-2.0.0-alpha-jar-with-dependencies.jar PageRankBenchmark运行正常 $ hadoop jar $GIRAPH_HOME/giraph-examples/target/giraph-examples-1.0.0-for-hadoop-2.0.0-alpha-jar-with-dependencies.jar \ org.apache.giraph.benchmark.PageRankBenchmark \ -Dgiraph.zkList=:2181 \ -e 1 -s 3 -v -V 50 -w 1 … 14/08/01 11:42:44 INFO mapred.JobClient: Job complete: job_201407291058_0015 … (full output is below) GiraphRunner SimpleShortestPathsVertex也运行正常 $ hadoop jar $GIRAPH_HOME/giraph-examples/target/giraph-examples-1.0.0-for-hadoop-2.0.0-alpha-jar-with-dependencies.jar \ org.apache.giraph.GiraphRunner \ -Dgiraph.zkList=:2181 \ org.apache.giraph.examples.SimpleShortestPathsVertex \ […]

Spring Boot YARN无法在Hadoop上运行2.8.0客户端无法访问DataNode

我正在尝试运行Spring Boot YARN示例(Windows上的https://spring.io/guides/gs/yarn-basic/ )。 在application.yml我将fsUri和resourceManagerHost更改为指向我的VM的主机192.168… 但是,当我试图运行应用程序Exceprion出现时: DFSClient: Exception in createBlockOutputStream java.net.ConnectException: Connection timed out: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1508) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1284) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) [2017-05-27 19:59:49.570] boot – 7728 INFO [Thread-5] — DFSClient: Abandoning BP-646365587-10.0.2.15-1495898351938:blk_1073741830_1006 [2017-05-27 19:59:49.602] boot – 7728 INFO [Thread-5] — DFSClient: Excluding […]

SPARK到HBase写作

我的SPARK计划的流程如下: 驱动程序 – >创建Hbase连接 – >广播Hbase句柄现在从执行程序,我们获取此句柄并尝试写入hbase 在Driver程序中,我正在创建HBase conf对象和Connection Object,然后通过JavaSPARK Context广播它,如下所示: SparkConf sparkConf = JobConfigHelper.getSparkConfig(); Configuration conf = new Configuration(); UserGroupInformation.setConfiguration(conf); jsc = new JavaStreamingContext(sparkConf, Durations.milliseconds(Long.parseLong(batchDuration))); Configuration hconf=HBaseConfiguration.create(); hconf.addResource(new Path(“/etc/hbase/conf/core-site.xml”)); hconf.addResource(new Path(“/etc/hbase/conf/hbase-site.xml”)); UserGroupInformation.setConfiguration(hconf); JavaSparkContext js = jsc.sparkContext(); Connection connection = ConnectionFactory.createConnection(hconf); connectionbroadcast=js.broadcast(connection); 执行器的内部call()方法, Table table = connectionbroadcast.getValue().getTable(TableName.valueOf(“gfttsdgn:FRESHHBaseRushi”)) ; Put p = new Put(Bytes.toBytes(“row1”)); p.add(Bytes.toBytes(“c1”), Bytes.toBytes(“output”), Bytes.toBytes(“rohan”)); […]

Pipeling hadoop map减少了工作量

我有五个map reduce我分别运行每个。 我想把它们一起管道。 因此,一份工作的输出转到下一份工作。 目前,我编写了shell脚本来执行它们。 有没有办法在java中写这个? 请举个例子。 谢谢

MapReduce查找字长频率

我是MapReduce的新手,我想问一下是否有人可以使用MapReduce给我一个执行字长的频率的想法。 我已经有了字数的代码但是我想使用字长,这是我到目前为止所拥有的。 public class WordCount { public static class Map extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } […]

多节点hadoop集群中的Apache Spark Sql问题

嗨,我使用Spark java apis从hive获取数据。 此代码适用于hadoop单节点集群。 但是当我尝试在hadoop多节点集群中使用它时,它会抛出错误 org.apache.spark.SparkException: Detected yarn-cluster mode, but isn’t running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. 注意:我已将master作为本地用于单节点,而yarn-cluster用于多节点。 这是我的java代码 SparkConf sparkConf = new SparkConf().setAppName(“Hive”).setMaster(“yarn-cluster”); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext sqlContext = new HiveContext(ctx.sc()); org.apache.spark.sql.Row[] result = sqlContext.sql(“Select * from Tablename”).collect(); 此外,我试图将master更改为本地,现在它抛出未知的主机名exception。 任何人都可以帮助我吗? 更新 错误日志 […]

在Pig Latin中为每个组写一个文件

问题:我有许多包含Apache Web服务器日志条目的文件。 这些条目不是按日期时间顺序排列,而是分散在文件中。 我正在尝试使用Pig来读取一天的文件,按日期时间对日志条目进行分组和排序,然后将它们写入以其包含的条目的日期和小时命名的文件。 设置:一旦我导入了我的文件,我使用Regex获取日期字段,然后我将其截断为小时。 这将生成一个集合,该集合在一个字段中包含记录,而日期在另一个字段中截断为小时。 从这里开始,我在日期时间字段上进行分组。 第一次尝试:我的第一个想法是使用STORE命令,同时使用FOREACH迭代我的组,并很快发现Pig并不酷。 第二次尝试:我的第二次尝试是在piggybank中使用MultiStorage()方法,这种方法很有效,直到我查看文件。 问题是MulitStorage想要将所有字段写入文件,包括我以前用于分组的字段。 我真正想要的只是写入文件的原始记录。 问题:那么……我是否将Pig用于不适合的事情,或者是否有更好的方法让我使用猪来解决这个问题? 现在我有了这个问题,我将编写一个简单的代码示例来进一步解释我的问题。 有了它,我会在这里发布。 提前致谢。

Hadoop中的CSV处理

我在csv文件中有6个字段: 首先是学生姓名( String ) 其他是学生的标记,如主题1,主题2等 我在java中编写mapreduce ,用逗号分割所有字段,并在键中发送学生姓名,并在地图值中标记。 在reduce我正在处理他们输出密钥中的学生姓名和theres标记加上减去的总值,平均值等。 我认为可能有一种替代的,更有效的方法来做到这一点。 有没有人知道更好的方法来做这些操作? 是否有任何内置function的hadoop可以按学生姓名分组,并且可以计算与该学生相关的总分和平均值?

输出文件包含Mapper输出而不是Reducer输出

嗨我试图在独立模式下使用map reduce技术找到少数数字的平均值。 我有两个输入文件。它包含值file1: 25 25 25 25 25和file2: 15 15 15 15 15 。 我的程序运行正常,但输出文件包含mapper的输出而不是reducer输出。 这是我的代码: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Writable; import java.io.*; public class Average { public static class SumCount implements Writable { public int […]