在Loop之后,全局变量的值不会改变

我正在开发一个hadoop项目。 我希望在某一天找到客户,然后写下当天最大消费的客户。 在我的reducer类中,由于某种原因,全局变量max在for循环后不会改变它的值。

编辑我想找到某一天最大消费的客户。 我已经设法在我想要的日期找到客户,但我在Reducer类中遇到了问题。 这是代码:

编辑#2我已经知道值(消耗)是自然数。 所以在我的输出文件中,我想成为某一天的客户,最大消费。

编辑#3我的输入文件由许多数据组成。 它有三列; 客户的ID,时间戳(yyyy-mm-DD HH:mm:ss)和消耗量

司机class

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class alicanteDriver { public static void main(String[] args) throws Exception { long t_start = System.currentTimeMillis(); long t_end; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Alicante"); job.setJarByClass(alicanteDriver.class); job.setMapperClass(alicanteMapperC.class); //job.setCombinerClass(alicanteCombiner.class); job.setPartitionerClass(alicantePartitioner.class); job.setNumReduceTasks(2); job.setReducerClass(alicanteReducerC.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt")); FileOutputFormat.setOutputPath(job, new Path("/alicante_output")); job.waitForCompletion(true); t_end = System.currentTimeMillis(); System.out.println((t_end-t_start)/1000); } } 

Mapper类

 import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class alicanteMapperC extends Mapper { String Customer = new String(); SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date t = new Date(); IntWritable Consumption = new IntWritable(); int counter = 0; // new vars int max = 0; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Date d2 = null; try { d2 = ft.parse("2013-07-01 01:00:00"); } catch (ParseException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } if (counter > 0) { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line, ","); while (itr.hasMoreTokens()) { Customer = itr.nextToken(); try { t = ft.parse(itr.nextToken()); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } Consumption.set(Integer.parseInt(itr.nextToken())); //sort out as many values as possible if(Consumption.get() > max) { max = Consumption.get(); } //find customers in a certain date if (t.compareTo(d2) == 0 && Consumption.get() == max) { context.write(new Text(Customer), Consumption); } } } counter++; } } 

减速机类

 import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class alicanteReducerC extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int max = 0; //this var // declaration of Lists List l1 = new ArrayList(); List l2 = new ArrayList(); for (IntWritable val : values) { if (val.get() > max) { max = val.get(); } l1.add(key); l2.add(val); } for (int i = 0; i < l1.size(); i++) { if (l2.get(i).get() == max) { context.write(key, new IntWritable(max)); } } } } 

输入文件的某些值

 C11FA586148,2013-07-01 01:00:00,3 C11FA586152,2015-09-01 15:22:22,3 C11FA586168,2015-02-01 15:22:22,1 C11FA586258,2013-07-01 01:00:00,5 C11FA586413,2013-07-01 01:00:00,5 C11UA487446,2013-09-01 15:22:22,3 C11UA487446,2013-07-01 01:00:00,3 C11FA586148,2013-07-01 01:00:00,4 

输出应该是

 C11FA586258 5 C11FA586413 5 

我在论坛上搜索了几个小时,仍然找不到问题。 有任何想法吗?

这里是重构的代码:您可以传递/更改消费日期的特定值。 在这种情况下,您不需要减速器。 我的第一个答案是从输入中查询max comsumption,这个答案是从输入中查询用户提供的消耗。
setup方法将为mapper.maxConsumption.date获取用户提供的值,并将它们传递给map方法。
cleaup方法扫描所有最大消费客户并在输入中写入最终最大值(在这种情况下为5) – 请参阅详细执行日志的屏幕截图:

运行方式:

 hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5 

 #input: C11FA586148,2013-07-01 01:00:00,3 C11FA586152,2015-09-01 15:22:22,3 C11FA586168,2015-02-01 15:22:22,1 C11FA586258,2013-07-01 01:00:00,5 C11FA586413,2013-07-01 01:00:00,5 C11UA487446,2013-09-01 15:22:22,3 C11UA487446,2013-07-01 01:00:00,3 C11FA586148,2013-07-01 01:00:00,4 #output: C11FA586258 5 C11FA586413 5 

 public class maxConsumption extends Configured implements Tool{ public static class DataMapper extends Mapper { SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date dateInFile, filterDate; int lineno=0; private final static Text customer = new Text(); private final static IntWritable consumption = new IntWritable(); private final static Text maxConsumptionDate = new Text(); public void setup(Context context) { Configuration config = context.getConfiguration(); maxConsumptionDate.set(config.get("mapper.maxConsumption.date")); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ try{ lineno++; filterDate = ft.parse(maxConsumptionDate.toString()); //map data from line/file String[] fields = value.toString().split(","); customer.set(fields[0].trim()); dateInFile = ft.parse(fields[1].trim()); consumption.set(Integer.parseInt(fields[2].trim())); if(dateInFile.equals(filterDate)) //only send to reducer if date filter matches.... context.write(new Text(customer), consumption); }catch(Exception e){ System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage()); } } } public static class DataReducer extends Reducer { LinkedHashMap maxConsumption = new LinkedHashMap(); @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int max=0; System.out.print("reducer received: " + key + " [ "); for(IntWritable value: values){ System.out.print( value.get() + " "); if(value.get() > max) max=value.get(); } System.out.println( " ]"); System.out.println(key.toString() + " max is " + max); maxConsumption.put(key.toString(), max); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { int max=0; //first find the max from reducer for (String key : maxConsumption.keySet()){ System.out.println("cleaup customer : " + key.toString() + " consumption : " + maxConsumption.get(key) + " max: " + max); if(maxConsumption.get(key) > max) max=maxConsumption.get(key); } System.out.println("final max is: " + max); //write only the max value from map for (String key : maxConsumption.keySet()){ if(maxConsumption.get(key) == max) context.write(new Text(key), new IntWritable(maxConsumption.get(key))); } } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new maxConsumption(), args); System.exit(res); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: -Dmapper.maxConsumption.date=\"2013-07-01 01:00:00\"  "); System.exit(2); } Configuration conf = this.getConf(); Job job = Job.getInstance(conf, "get-max-consumption"); job.setJarByClass(maxConsumption.class); job.setMapperClass(DataMapper.class); job.setReducerClass(DataReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = null; Path dstFilePath = new Path(args[1]); try { fs = dstFilePath.getFileSystem(conf); if (fs.exists(dstFilePath)) fs.delete(dstFilePath, true); } catch (IOException e1) { e1.printStackTrace(); } return job.waitForCompletion(true) ? 0 : 1; } } 

maxconsumption

可能所有进入减速器的值都小于0.尝试最小值以确定是否变量变化。

 max = MIN_VALUE; 

根据你的说法,输出应该只有0(在这里减速器的最大值是0)或没有输出(所有值都小于0)。 另外,看看这个

 context.write(key, new IntWritable()); 

它应该是

 context.write(key, new IntWritable(max)); 

编辑:我刚看到你的Mapper类,它有很多问题。 以下代码正在滑动每个映射器中的第一个元素。 为什么?

  if (counter > 0) { 

我想,你得到这样的东西吧? “customer,2013-07-01 01:00:00,2,…”如果是这种情况并且您已经过滤了值,则应将max变量声明为local,而不是在mapper范围内,它会影响多个客户。

围绕这个有很多问题..你可以解释每个映射器的输入以及你想要做什么。

EDIT2:根据你的回答,我会尝试这个

 import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AlicanteMapperC extends Mapper { private final int max = 5; private SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Date t = null; String[] line = value.toString().split(","); String customer = line[0]; try { t = ft.parse(line[1]); } catch (ParseException e) { // TODO Auto-generated catch block throw new RuntimeException("something wrong with the date!" + line[1]); } Integer consumption = Integer.parseInt(line[2]); //find customers in a certain date if (t.compareTo(ft.parse("2013-07-01 01:00:00")) == 0 && consumption == max) { context.write(new Text(customer), new IntWritable(consumption)); } counter++; } } 

和reducer相当简单,每个客户发出1条记录

 import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class AlicanteReducerC extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //We already now that it is 5 context.write(key, new IntWritable(5)); //If you want something different, for example report customer with different values, you could iterate over the iterator like this //for (IntWritable val : values) { // context.write(key, new IntWritable(val)); //} } }