mapreduce组合键样本 – 不显示所需的输出

作为mapreduce和hadoop世界的新手,在尝试了基本的mapreduce程序后,我想尝试使用compositekey示例代码。

输入数据集如下:

国家,州,县,populationinmillions

美国,CA,阿拉米达,100

美国,CA,losangels,200

美国,CA,萨克拉曼多,100

美国,佛罗里达州,xxx,10

美国,佛罗里达州,YYY,12

期望的输出数据应如下所示:

美国,CA,500

美国,佛罗里达州,22

而是Country + State字段形成复合键。 我得到以下输出。 由于某种原因,人口没有增加。 有人能指出我正在做的错误。 另外请看一下实现WriteableComparable接口的Country.java类。 这个实现可能有问题。

美国,CA,100

美国,CA,200

美国,CA,100

美国,佛罗里达州,10

美国,佛罗里达州,12

每个国家+州都没有增加人口。

这是实现WritableComparable接口的Country类。

import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * The Country class implements WritabelComparator to implements custom sorting to perform group by operation. It * sorts country and then state. * */ public class Country implements WritableComparable { Text country; Text state; public Country(Text country, Text state) { this.country = country; this.state = state; } public Country() { this.country = new Text(); this.state = new Text(); } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) */ public void write(DataOutput out) throws IOException { this.country.write(out); this.state.write(out); } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) */ public void readFields(DataInput in) throws IOException { this.country.readFields(in); this.state.readFields(in); ; } /* * (non-Javadoc) * * @see java.lang.Comparable#compareTo(java.lang.Object) */ public int compareTo(Country pop) { if (pop == null) return 0; int intcnt = country.compareTo(pop.country); if (intcnt != 0) { return intcnt; } else { return state.compareTo(pop.state); } } /* * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { return country.toString() + ":" + state.toString(); } } 

司机计划:

 import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CompositeKeyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "CompositeKeyDriver"); //first argument is job itself //second argument is location of the input dataset FileInputFormat.addInputPath(job, new Path(args[0])); //first argument is the job itself //second argument is the location of the output path FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJarByClass(CompositeKeyDriver.class); job.setMapperClass(CompositeKeyMapper.class); job.setReducerClass(CompositeKeyReducer.class); job.setOutputKeyClass(Country.class); job.setOutputValueClass(IntWritable.class); //setting the second argument as a path in a path variable Path outputPath = new Path(args[1]); //deleting the output path automatically from hdfs so that we don't have delete it explicitly outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } 

}

映射程序:

 import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // First two parameters are Input Key and Input Value. Input Key = offset of each line (remember each line is a record). Input value = Line itself // Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS. // Output Key = Country object is sent. Output Value = population in millions in that country + state combination public class CompositeKeyMapper extends Mapper { /** The cntry. */ Country cntry = new Country(); /** The cnt text. */ Text cntText = new Text(); /** The state text. */ Text stateText = new Text(); //population in a Country + State IntWritable populat = new IntWritable(); /** * * Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper * directly write to disk without sorting. * */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Reader will give each record in a line to the Mapper. //That line is split with the de-limiter "," String line = value.toString(); String[] keyvalue = line.split(","); //Country is the first item in the line in each record cntText.set(new Text(keyvalue[0])); //State is the second item in the line in each record stateText.set(keyvalue[1]); //This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization. //So we have to use the equivalent Writable datatypes provided by mapreduce framework populat.set(Integer.parseInt(keyvalue[3])); //Here you are creating an object of Country class and in the constructor assigning the country name and state Country cntry = new Country(cntText, stateText); //Here you are passing the country object and their population to the context object. //Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class //Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we //can't sort the objects. context.write(cntry, populat); } } 

减速机程序:

 import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //Remember the two output parameters of the Mapper class will become the first two input parameters to the reducer class. public class CompositeKeyReducer extends Reducer { // The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details. // The second parameter "values" is the collection of population for Country+State (this is a composite Key) public void reduce(Country key, Iterator values, Context context) throws IOException, InterruptedException { int numberofelements = 0; int cnt = 0; while (values.hasNext()) { cnt = cnt + values.next().get(); } context.write(key, new IntWritable(cnt)); } } 

您正在使用HashPartitioner因此您的Country类需要实现hashCode()方法。

目前它将在Object上使用默认的hashCode()实现,这将导致您的密钥无法正确分组。

这是一个示例hashCode()方法:

 @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((country == null) ? 0 : country.hashCode()); result = prime * result + ((state == null) ? 0 : state.hashCode()); return result; } 

附加信息:

为了安全起见,您应该set Text对象。 目前,您在Country构造函数中执行此操作。

 public Country(Text country, Text state) { this.country = country; this.state = state; } 

你应该改为:

 public Country(Text country, Text state) { this.country.set(country); this.state.set(state); } 

减速机问题现在已修复。 我没有对代码进行任何更改。 我所做的只是重新启动我的Cloudera Hadoop图像。

我在调试过程中注意到以下内容。 有人可以评论这些意见吗?

  1. 经常更改代码并创建jar文件并运行mapreduce jar程序并不反映输出。 这不会一直发生。 不确定hadoop守护进程是否需要偶尔重启一次。