在hadoop上解析Stackoverflow`s posts.xml

我正在关注Codeoject上的Anoop Madhusudanan的这篇文章 ,以构建一个不在集群上但在我的系统上的推荐引擎。

问题是当我尝试解析posts.xml时,其结构如下:

 

现在我需要在hadoop上解析这个文件(大小为1.4 gb),我已经在java中编写了代码并创建了它的jar。 Java类如下:

 import java.io.IOException; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilder; import org.w3c.dom.Document; import org.w3c.dom.NodeList; import org.w3c.dom.Node; import org.w3c.dom.Element; import java.io.File; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; public class Recommend { static class Map extends Mapper { Path path; String fXmlFile; DocumentBuilderFactory dbFactory; DocumentBuilder dBuilder; Document doc; /** * Given an output filename, write a bunch of random records to it. */ public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try{ fXmlFile=value.toString(); dbFactory = DocumentBuilderFactory.newInstance(); dBuilder= dbFactory.newDocumentBuilder(); doc= dBuilder.parse(fXmlFile); doc.getDocumentElement().normalize(); NodeList nList = doc.getElementsByTagName("row"); for (int temp = 0; temp < nList.getLength(); temp++) { Node nNode = nList.item(temp); Element eElement = (Element) nNode; Text keyWords =new Text(eElement.getAttribute("OwnerUserId")); Text valueWords = new Text(eElement.getAttribute("ParentId")); String val=keyWords.toString()+" "+valueWords.toString(); // Write the sentence if(keyWords != null && valueWords != null){ output.collect(keyWords, new Text(val)); } } }catch (Exception e) { e.printStackTrace(); } } } /** * * @throws IOException */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); /*if (args.length != 2) { System.err.println("Usage: wordcount  "); System.exit(2); }*/ // FileSystem fs = FileSystem.get(conf); Job job = new Job(conf, "Recommend"); job.setJarByClass(Recommend.class); // the keys are words (strings) job.setOutputKeyClass(Text.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // the values are counts (ints) job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); //conf.setReducerClass(Reduce.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); Path outPath = new Path(args[1]); FileSystem dfs = FileSystem.get(outPath.toUri(), conf); if (dfs.exists(outPath)) { dfs.delete(outPath, true); } } } 

我希望输出作为hadoop中的文件包含输出作为OwnerUserId ParentId但我输出为:

 1599788  

我不知道1599788的起源1599788来自mapper的关键值。

我不太了解为hadoop编写映射器类,我需要帮助来修改我的代码以获得所需的输出。

提前致谢。

经过大量的研究和实验,终于学会了为parsin xml文件编写地图的方法,这些文件具有我提供的语法。 我改变了我的方法,这是我的新映射器代码……它适用于我的用例。

希望它可以帮助别人,他们可以节省他们的时间:)

 import java.io.IOException; import java.util.StringTokenizer; import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.xml.sax.SAXException; public class Map extends Mapper { NullWritable obj; @Override public void map(LongWritable key, Text value, Context context) throws InterruptedException { StringTokenizer tok= new StringTokenizer(value.toString()); String pa=null,ow=null,pi=null,v; while (tok.hasMoreTokens()) { String[] arr; String val = (String) tok.nextToken(); if(val.contains("PostTypeId")){ arr= val.split("[\"]"); pi=arr[arr.length-1]; if(pi.equals("2")){ continue; } else break; } if(val.contains("ParentId")){ arr= val.split("[\"]"); pa=arr[arr.length-1]; } else if(val.contains("OwnerUserId") ){ arr= val.split("[\"]"); ow=arr[arr.length-1]; try { if(pa!=null && ow != null){ v=String.format("{0},{1}", ow,pa); context.write(obj,new Text(v)); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } 

这是我编写的映射器,用于解析所有postxml并在hadoop上创建一个选项卡分隔文件,供其他map reduce工作或Hive或Pig使用。

映射器

 package com.aravind.learning.hadoop.mapred.techtalks; import java.io.IOException; import java.io.StringReader; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.InputSource; import org.xml.sax.SAXException; import com.google.common.base.Joiner; public class StackoverflowDataWranglerMapper extends Mapper { static enum BadRecordCounters { NO_CREATION_DATE, UNKNOWN_USER_ID, UNPARSEABLE_RECORD, UNTAGGED_POSTS } private final Text outputKey = new Text(); private final Text outputValue = new Text(); private final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); private DocumentBuilder builder; private static final Joiner TAG_JOINER = Joiner.on(",").skipNulls(); // 2008-07-31T21:42:52.667 private static final DateFormat DATE_PARSER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); private static final SimpleDateFormat DATE_BUILDER = new SimpleDateFormat("yyyy-MM-dd"); @Override protected void setup(Context context) throws IOException, InterruptedException { try { builder = factory.newDocumentBuilder(); } catch (ParserConfigurationException e) { new IOException(e); } } @Override protected void map(LongWritable inputKey, Text inputValue, Mapper.Context context) throws IOException, InterruptedException { try { String entry = inputValue.toString(); if (entry.contains(" 0) { try { parsedDate = DATE_PARSER.parse(creationDate); } catch (ParseException e) { context.getCounter("Bad Record Counters", "Posts missing CreationDate").increment(1); } } if (postedBy.length() == 0 || postedBy.trim().equals("-1")) { context.getCounter("Bad Record Counters", "Posts with either empty UserId or UserId contains '-1'") .increment(1); try { parsedDate = DATE_BUILDER.parse("2100-00-01"); } catch (ParseException e) { // ignore } } tags = tags.trim(); String tagTokens[] = null; if (tags.length() > 1) { tagTokens = tags.substring(1, tags.length() - 1).split("><"); } else { context.getCounter("Bad Record Counters", "Untagged Posts").increment(1); } outputKey.clear(); outputKey.set(id); StringBuilder sb = new StringBuilder(postedBy).append("\t").append(parsedDate.getTime()).append("\t") .append(postTypeId).append("\t").append(title).append("\t").append(viewCount).append("\t").append(score) .append("\t"); if (tagTokens != null) { sb.append(TAG_JOINER.join(tagTokens)).append("\t"); } else { sb.append("").append("\t"); } sb.append(answerCount).append("\t").append(commentCount).append("\t").append(favoriteCount).toString(); outputValue.set(sb.toString()); context.write(outputKey, outputValue); } } catch (SAXException e) { context.getCounter("Bad Record Counters", "Unparsable records").increment(1); } finally { builder.reset(); } } } 

司机

 public class StackoverflowDataWranglerDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options]  \n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(getConf()); job.setJobName("Tech Talks - Stackoverflow Forum Posts - Data Wrangler"); TextInputFormat.addInputPath(job, new Path(args[0])); TextOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setJarByClass(StackoverflowDataWranglerMapper.class);// required for mr1 job.setMapperClass(StackoverflowDataWranglerMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String args[]) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new StackoverflowDataWranglerDriver(), args); System.exit(exitCode); } } 

作业提交命令

 hadoop jar ./hadoop-examples-0.0.1-SNAPSHOT.jar com.aravind.learning.hadoop.mapred.techtalks.StackoverflowDataWranglerDriver data/stackoverflow-posts.xml data/so-posts-tsv