Hadoop mapreduce:用于在MapReduce作业中链接映射器的驱动程序
我有mapreduce工作:我的代码Map类:
public static class MapClass extends Mapper { @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { } }
我想使用ChainMapper:
1. Job job = new Job(conf, "Job with chained tasks"); 2. job.setJarByClass(MapReduce.class); 3. job.setInputFormatClass(TextInputFormat.class); 4. job.setOutputFormatClass(TextOutputFormat.class); 5. FileInputFormat.setInputPaths(job, new Path(InputFile)); 6. FileOutputFormat.setOutputPath(job, new Path(OutputFile)); 7. JobConf map1 = new JobConf(false); 8. ChainMapper.addMapper( job, MapClass.class, Text.class, Text.class, Text.class, Text.class, true, map1 );
但它的报告在第8行有一个错误:
此行的多个标记 – “addMapper”的出现 – ChainMapper类型中的方法addMapper(JobConf,Class>,Class,Class,Class,Class,boolean,JobConf)不适用于参数(Job,Class,Class,类,类,类,布尔值,配置) – 调试当前指令指针 – ChainMapper类型中的方法addMapper(JobConf,Class>,Class,Class,Class,Class,boolean,JobConf)不适用于参数(JobConf,类,类,类,类,类,布尔值,JobConf)
经过大量的“功夫”,我能够使用ChainMapper/ChainReducer
。 感谢上次评论user864846。
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package myPKG; /* * Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. */ import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ChainWordCount extends Configured implements Tool { public static class Tokenizer extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); System.out.println("Line:"+line); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class UpperCaser extends MapReduceBase implements Mapper { public void map(Text key, IntWritable value, OutputCollector output, Reporter reporter) throws IOException { String word = key.toString().toUpperCase(); System.out.println("Upper Case:"+word); output.collect(new Text(word), value); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } System.out.println("Word:"+key.toString()+"\tCount:"+sum); output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("wordcount
编辑最新版本( 至少从hadoop 2.6 ),不需要addMapper中的true
标志。 (实际上签名有变化抑制它`)。
所以它会是公正的
JobConf mapAConf = new JobConf(false); ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, mapAConf);
您必须使用Configuration
而不是JobConf
。 JobConf
是Configuration
的子类,因此应该存在一个构造函数。
实际上mapper类必须实现 org.apache.hadoop.mapred.Mapper
接口。 我有同样的问题,但这解决了它。
对于ChainMapper.addMapper()
的第一个参数,您已经传递了job
对象。 虽然该函数期望JobConf
类型的对象。 重写为:
ChainMapper.addMapper( (JobConf)CONF, MapClass.class, Text.class, Text.class, Text.class, Text.class, 真正, MAP1 );
应该解决问题..
- “hadoop namenode -format”返回java.net.UnknownHostException
- 线程“main”中的exceptionjava.lang.NoClassDefFoundError:org / apache / hadoop / hbase / HBaseConfiguration
- 增加Hadoop 2中的Hive映射器数量
- SPARK到HBase写作
- 在Spark中提取hive表分区 – java
- Hadoop Hive无法将源移动到目标
- Datanode守护程序未在Hadoop 2.5.0上运行
- 相当于mongo的出局:减少hadoop中的选项
- hadoop map中的InstantiationException减少程序