执行中出现Hadoop错误:键入map中的键不匹配:期望org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

我正在Hadoop上实现一个PageRank算法,正如标题所示,我在尝试执行代码时想出了以下错误:

键入map中的键不匹配:期望org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

在我的输入文件中,我将图形节点ID作为键存储,并将它们的一些信息存储为值。 我的输入文件格式如下:

1 \ t 3.4,2,5,6,67

4 \ t 4.2,77,2,7,83

……

试图理解错误说明我尝试使用LongWritable作为我的主变量类型,如下面的代码所示。 这意味着我有:

map

reduce

但是,我也尝试过:

地图

减少

并且:

map

reduce

我总是想出同样的错误。 我想我无法理解错误中的预期和接收方式。 这是否意味着我的地图函数从我的输入文件中得到LongWritable并且它有文本? 我使用的输入文件格式或变量类型是否有问题?

这是完整的代码,你能告诉我要改变什么以及在哪里?:

import java.io.IOException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.lang.Object.*; import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.log4j.*; import org.apache.commons.logging.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Pagerank { public static class PRMap extends Mapper { public void map(LongWritable lineNum, LongWritable line, OutputCollector outputCollector, Reporter reporter) throws IOException, InterruptedException { if (line.toString().length() == 0) { return; } Text key = new Text(); Text value = new Text(); LongWritable valuel = new LongWritable(); StringTokenizer spline = new StringTokenizer(line.toString(),"\t"); key.set(spline.nextToken()); value.set(spline.nextToken()); valuel.set(Long.parseLong(value.toString())); outputCollector.collect(lineNum,valuel); String info = value.toString(); String splitter[] = info.split(","); if(splitter.length >= 3) { float f = Float.parseFloat(splitter[0]); float pagerank = f / (splitter.length - 2); for(int i=2;i<splitter.length;i++) { LongWritable key2 = new LongWritable(); LongWritable value2 = new LongWritable(); long l; l = Long.parseLong(splitter[i]); key2.set(l); //key2.set(splitter[i]); value2.set((long)f); outputCollector.collect(key2, value2); } } } } public static class PRReduce extends Reducer { private Text result = new Text(); public void reduce(LongWritable key, Iterator values,OutputCollector results, Reporter reporter) throws IOException, InterruptedException { float pagerank = 0; String allinone = ","; while(values.hasNext()) { LongWritable temp = values.next(); String converted = temp.toString(); String[] splitted = converted.split(","); if(splitted.length > 1) { for(int i=1;i<splitted.length;i++) { allinone = allinone.concat(splitted[i]); if(i != splitted.length - 1) allinone = allinone.concat(","); } } else { float f = Float.parseFloat(splitted[0]); pagerank = pagerank + f; } } String last = Float.toString(pagerank); last = last.concat(allinone); LongWritable value = new LongWritable(); value.set(Long.parseLong(last)); results.collect(key, value); } } public static void main(String[] args) throws Exception { org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount  "); System.exit(2); } Job job = new Job(conf, "pagerank_itr0"); job.setJarByClass(Pagerank.class); job.setMapperClass(Pagerank.PRMap.class); job.setReducerClass(Pagerank.PRReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); } } 

您没有在作业配置中设置Mapper输出类。 尝试使用以下方法设置Job中的键和值类:

setMapOutputKeyClass();

setMapOutputValueClass();