Java Hadoop:我如何创建捕获器作为输入文件并提供输出,即每个文件中的行数?

我是Hadoop的新手,我已经设法运行wordCount示例: http ://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html

假设我们有一个包含3个文件的文件夹。 我希望每个文件都有一个映射器,这个映射器只计算行数并将其返回到reducer。

然后,reducer将输入每个映射器的行数作为输入,并将所有3个文件中存在的行总数作为输出。

所以,如果我们有以下3个文件

input1.txt input2.txt input3.txt 

并且映射器返回:

 mapper1 -> [input1.txt, 3] mapper2 -> [input2.txt, 4] mapper3 -> [input3.txt, 9] 

减速器将输出

 3+4+9 = 16 

我在一个简单的java应用程序中完成了这个,所以我想在Hadoop中完成它。 我只有一台计算机,并希望尝试在伪分布式环境中运行。

我怎样才能实现这个目标? 我应该采取什么适当的措施?

我的代码应该在apache的示例中看起来像那样吗? 我将有两个静态类,一个用于mapper,一个用于reducer? 或者我应该有3个类,每个映射器一个?

如果你能指导我完成这个,我不知道如何做到这一点,我相信如果我设法编写一些代码来做这些东西,那么我将来能够编写更复杂的应用程序。

谢谢!

除了sa125的答案之外,你可以通过不为每个输入记录发出记录来大大提高性能,而只是在映射器中累积一个计数器,然后在mapper清理方法中,发出文件名和计数值:

 public class LineMapper extends Mapper { protected long lines = 0; @Override protected void cleanup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); String filename = split.getPath().toString(); context.write(new Text(filename), new LongWritable(lines)); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { lines++; } } 

我注意到你使用的是0.18版本的文档。 这是1.0.2 (最新) 的链接 。

第一个建议 – 使用IDE(eclipse,IDEA等)。 填补空白真的很有帮助。

在实际的HDFS中,您无法知道文件的每个部分所在的位置(不同的计算机和群集)。 没有保证行X甚至与行Y驻留在同一磁盘上。也不能保证行X不会在不同的机器上分割(HDFS以块为单位分配数据,通常每块64Mb)。 这意味着您不能假设相同的映射器将处理整个文件。 您可以确保每个文件都由同一个reducer处理

由于reducer对于映射器发送的每个键都是唯一的,所以我这样做的方法是使用文件名作为映射器中的输出键。 此外,映射器的默认输入类是TextInputFormat ,这意味着每个映射器将自己接收整行(由LF或CR终止)。 然后,您可以从映射器中发出文件名和数字1(或其他任何内容,与计算无关)。 然后,在reducer中,您只需使用一个循环来计算接收文件名的次数:

在mapper的map函数中

 public static class Map extends Mapper { public void map(IntWritable key, Text value, Context context) { // get the filename InputSplit split = context.getInputSplit(); String fileName = split.getPath().getName(); // send the filename to the reducer, the value // has no meaning (I just put "1" to have something) context.write( new Text(fileName), new Text("1") ); } } 

在减速机的减速function

 public static class Reduce extends Reducer { public void reduce(Text fileName, Iterator values, Context context) { long rowcount = 0; // values get one entry for each row, so the actual value doesn't matter // (you can also get the size, I'm just lazy here) for (Text val : values) { rowCount += 1; } // fileName is the Text key received (no need to create a new object) context.write( fileName, new Text( String.valueOf( rowCount ) ) ); } } 

在司机/主要

您可以使用与wordcount示例相同的驱动程序 – 请注意,我使用了新的mapreduce API,因此您需要调整一些内容( Job而不是JobConf等)。 当我读到它时, 这真的很有帮助 。

请注意,您的MR输出将只是每个文件名及其行数:

 input1.txt 3 input2.txt 4 input3.txt 9 

如果您只想计算所有文件中的TOTAL行数,只需在所有映射器中发出相同的键(而不是文件名)。 这样,只有一个reducer可以处理所有行计数:

 // no need for filename context.write( new Text("blah"), new Text("1") ); 

您还可以链接一个处理每个文件行数的输出的作业,或者做其他花哨的东西 – 这取决于您。

我留下了一些样板代码,但基础知识就在那里。 一定要检查我,因为我从记忆中输入了大部分内容.. 🙂

希望这可以帮助!