Java MapReduce按日期计算

我是Hadoop的新手,我正在尝试做一个MapReduce程序,按日期计算最大前两个出版物(按月分组)。 所以我的意见是这样的:

2017-06-01 , A, B, A, C, B, E, F 2017-06-02 , Q, B, Q, F, K, E, F 2017-06-03 , A, B, A, R, T, E, E 2017-07-01 , A, B, A, C, B, E, F 2017-07-05 , A, B, A, G, B, G, G 

所以,我正在考虑这个MapReducer程序的结果,如:

 2017-06, A:4, E:4 2017-07, A:4, B:4 

 public class ArrayGiulioTest { public static Logger logger = Logger.getLogger(ArrayGiulioTest.class); public static class CustomMap extends Mapper { private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { TextWritable array = new TextWritable(); String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line, ","); String dataAttuale = tokenizer.nextToken().substring(0, line.lastIndexOf("-")); Text tmp = null; Text[] tmpArray = new Text[tokenizer.countTokens()]; int i = 0; while (tokenizer.hasMoreTokens()) { String prod = tokenizer.nextToken(","); word.set(dataAttuale); tmp = new Text(prod); tmpArray[i] = tmp; i++; } array.set(tmpArray); context.write(word, array); } } public static class CustomReduce extends Reducer { public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException { MapWritable map = new MapWritable(); Text txt = new Text(); while (values.hasNext()) { TextWritable array = values.next(); Text[] tmpArray = (Text[]) array.toArray(); for(Text t : tmpArray) { if(map.get(t)!= null) { IntWritable val = (IntWritable) map.get(t); map.put(t, new IntWritable(val.get()+1)); } else { map.put(t, new IntWritable(1)); } } } Set set = map.keySet(); StringBuffer str = new StringBuffer(); for(Writable k : set) { str.append("key: " + k.toString() + " value: " + map.get(k) + "**"); } txt.set(str.toString()); context.write(key, txt); } } public static void main(String[] args) throws Exception { long inizio = System.currentTimeMillis(); Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "countProduct"); job.setJarByClass(ArrayGiulioTest.class); job.setMapperClass(CustomMap.class); //job.setCombinerClass(CustomReduce.class); job.setReducerClass(CustomReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TextWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); long fine = System.currentTimeMillis(); logger.info("**************************************End" + (End-Start)); System.exit(1); } } 

并且我以这种方式实现了我的自定义TextWritable:

 public class TextWritable extends ArrayWritable { public TextWritable() { super(Text.class); } } 

..当我运行我的MapReduce程序时,我获得了这种结果

 2017-6 wordcount.TextWritable@3e960865 2017-6 wordcount.TextWritable@3e960865 

很明显,我的减速机不起作用。 它似乎是我的Mapper的输出

任何想法? 有人可以说,解决方案是否正确?

这里是控制台日志(仅供参考,我的输入文件有6行而不是5行)*我在eclipse(mono JVM)下启动MapReduce问题或使用Hdop和Hdf获得相同的结果

 File System Counters FILE: Number of bytes read=1216 FILE: Number of bytes written=431465 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=6 Map output records=6 Map output bytes=214 Map output materialized bytes=232 Input split bytes=97 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=232 Reduce input records=6 Reduce output records=6 Spilled Records=12 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=394264576 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=208 File Output Format Counters Bytes Written=1813 

我想你正试图在Mapper中做太多工作。 您只需要对日期进行分组(根据您的预期输出,您似乎无法正确格式化它们)。

例如,以下方法将转变这些方面

 2017-07-01 , A, B, A, C, B, E, F 2017-07-05 , A, B, A, G, B, G, G 

进入这对减速机

 2017-07 , ("A,B,A,C,B,E,F", "A,B,A,G,B,G,G") 

换句话说,使用ArrayWritable没有任何实际好处,只需将其保留为文本即可。


所以,Mapper看起来像这样

 class CustomMap extends Mapper { private final Text key = new Text(); private final Text output = new Text(); @Override protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException { int separatorIndex = value.find(","); final String valueStr = value.toString(); if (separatorIndex < 0) { System.err.printf("mapper: not enough records for %s", valueStr); return; } String dateKey = valueStr.substring(0, separatorIndex).trim(); String tokens = valueStr.substring(1 + separatorIndex).trim().replaceAll("\\p{Space}", ""); SimpleDateFormat fmtFrom = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat fmtTo = new SimpleDateFormat("yyyy-MM"); try { dateKey = fmtTo.format(fmtFrom.parse(dateKey)); key.set(dateKey); } catch (ParseException ex) { System.err.printf("mapper: invalid key format %s", dateKey); return; } output.set(tokens); context.write(key, output); } } 

然后reducer可以构建一个Map来收集和计算值字符串中的值。 再次,只写出文字。

 class CustomReduce extends Reducer { private final Text output = new Text(); @Override protected void reduce(Text date, Iterable values, Context context) throws IOException, InterruptedException { Map keyMap = new TreeMap<>(); for (Text v : values) { String[] keys = v.toString().trim().split(","); for (String key : keys) { if (!keyMap.containsKey(key)) { keyMap.put(key, 0); } keyMap.put(key, 1 + keyMap.get(key)); } } output.set(mapToString(keyMap)); context.write(date, output); } private String mapToString(Map map) { StringBuilder sb = new StringBuilder(); String delimiter = ", "; for (Map.Entry entry : map.entrySet()) { sb.append( String.format("%s:%d", entry.getKey(), entry.getValue()) ).append(delimiter); } sb.setLength(sb.length()-delimiter.length()); return sb.toString(); } } 

鉴于你的意见,我得到了这个

 2017-06 A:4, B:4, C:1, E:4, F:3, K:1, Q:2, R:1, T:1 2017-07 A:4, B:4, C:1, E:1, F:1, G:3 

主要问题是关于reduce方法的符号:

我写的是: public void reduce(Text key, Iterator values, Context context)

代替

  public void reduce(Text key, Iterable values, 

这就是为什么我获得我的Map输出而不是我的Reduce otuput的原因