在Loop之后,全局变量的值不会改变
我正在开发一个hadoop项目。 我希望在某一天找到客户,然后写下当天最大消费的客户。 在我的reducer类中,由于某种原因,全局变量max在for循环后不会改变它的值。
编辑我想找到某一天最大消费的客户。 我已经设法在我想要的日期找到客户,但我在Reducer类中遇到了问题。 这是代码:
编辑#2我已经知道值(消耗)是自然数。 所以在我的输出文件中,我想成为某一天的客户,最大消费。
编辑#3我的输入文件由许多数据组成。 它有三列; 客户的ID,时间戳(yyyy-mm-DD HH:mm:ss)和消耗量
司机class
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class alicanteDriver { public static void main(String[] args) throws Exception { long t_start = System.currentTimeMillis(); long t_end; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Alicante"); job.setJarByClass(alicanteDriver.class); job.setMapperClass(alicanteMapperC.class); //job.setCombinerClass(alicanteCombiner.class); job.setPartitionerClass(alicantePartitioner.class); job.setNumReduceTasks(2); job.setReducerClass(alicanteReducerC.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt")); FileOutputFormat.setOutputPath(job, new Path("/alicante_output")); job.waitForCompletion(true); t_end = System.currentTimeMillis(); System.out.println((t_end-t_start)/1000); } }
Mapper类
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class alicanteMapperC extends Mapper { String Customer = new String(); SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date t = new Date(); IntWritable Consumption = new IntWritable(); int counter = 0; // new vars int max = 0; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Date d2 = null; try { d2 = ft.parse("2013-07-01 01:00:00"); } catch (ParseException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } if (counter > 0) { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line, ","); while (itr.hasMoreTokens()) { Customer = itr.nextToken(); try { t = ft.parse(itr.nextToken()); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } Consumption.set(Integer.parseInt(itr.nextToken())); //sort out as many values as possible if(Consumption.get() > max) { max = Consumption.get(); } //find customers in a certain date if (t.compareTo(d2) == 0 && Consumption.get() == max) { context.write(new Text(Customer), Consumption); } } } counter++; } }
减速机类
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class alicanteReducerC extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int max = 0; //this var // declaration of Lists List l1 = new ArrayList(); List l2 = new ArrayList(); for (IntWritable val : values) { if (val.get() > max) { max = val.get(); } l1.add(key); l2.add(val); } for (int i = 0; i < l1.size(); i++) { if (l2.get(i).get() == max) { context.write(key, new IntWritable(max)); } } } }
输入文件的某些值
C11FA586148,2013-07-01 01:00:00,3 C11FA586152,2015-09-01 15:22:22,3 C11FA586168,2015-02-01 15:22:22,1 C11FA586258,2013-07-01 01:00:00,5 C11FA586413,2013-07-01 01:00:00,5 C11UA487446,2013-09-01 15:22:22,3 C11UA487446,2013-07-01 01:00:00,3 C11FA586148,2013-07-01 01:00:00,4
输出应该是
C11FA586258 5 C11FA586413 5
我在论坛上搜索了几个小时,仍然找不到问题。 有任何想法吗?
这里是重构的代码:您可以传递/更改消费日期的特定值。 在这种情况下,您不需要减速器。 我的第一个答案是从输入中查询max comsumption,这个答案是从输入中查询用户提供的消耗。
setup
方法将为mapper.maxConsumption.date
获取用户提供的值,并将它们传递给map
方法。
cleaup
方法扫描所有最大消费客户并在输入中写入最终最大值(在这种情况下为5) – 请参阅详细执行日志的屏幕截图:
运行方式:
hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5
#input: C11FA586148,2013-07-01 01:00:00,3 C11FA586152,2015-09-01 15:22:22,3 C11FA586168,2015-02-01 15:22:22,1 C11FA586258,2013-07-01 01:00:00,5 C11FA586413,2013-07-01 01:00:00,5 C11UA487446,2013-09-01 15:22:22,3 C11UA487446,2013-07-01 01:00:00,3 C11FA586148,2013-07-01 01:00:00,4 #output: C11FA586258 5 C11FA586413 5
public class maxConsumption extends Configured implements Tool{ public static class DataMapper extends Mapper
可能所有进入减速器的值都小于0.尝试最小值以确定是否变量变化。
max = MIN_VALUE;
根据你的说法,输出应该只有0(在这里减速器的最大值是0)或没有输出(所有值都小于0)。 另外,看看这个
context.write(key, new IntWritable());
它应该是
context.write(key, new IntWritable(max));
编辑:我刚看到你的Mapper类,它有很多问题。 以下代码正在滑动每个映射器中的第一个元素。 为什么?
if (counter > 0) {
我想,你得到这样的东西吧? “customer,2013-07-01 01:00:00,2,…”如果是这种情况并且您已经过滤了值,则应将max变量声明为local,而不是在mapper范围内,它会影响多个客户。
围绕这个有很多问题..你可以解释每个映射器的输入以及你想要做什么。
EDIT2:根据你的回答,我会尝试这个
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AlicanteMapperC extends Mapper { private final int max = 5; private SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Date t = null; String[] line = value.toString().split(","); String customer = line[0]; try { t = ft.parse(line[1]); } catch (ParseException e) { // TODO Auto-generated catch block throw new RuntimeException("something wrong with the date!" + line[1]); } Integer consumption = Integer.parseInt(line[2]); //find customers in a certain date if (t.compareTo(ft.parse("2013-07-01 01:00:00")) == 0 && consumption == max) { context.write(new Text(customer), new IntWritable(consumption)); } counter++; } }
和reducer相当简单,每个客户发出1条记录
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class AlicanteReducerC extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //We already now that it is 5 context.write(key, new IntWritable(5)); //If you want something different, for example report customer with different values, you could iterate over the iterator like this //for (IntWritable val : values) { // context.write(key, new IntWritable(val)); //} } }