如何在map reduce程序中解析PDF文件?
我想在我的hadoop 2.2.0程序中解析PDF文件,我发现了这个 ,按照它的说法,直到现在,我有这三个类:
-
PDFWordCount
:包含map和reduce函数的主类。 (就像本机hadoop wordcount示例,但我使用的是PDFInputFormat
类而不是TextInputFormat
。 -
PDFRecordReader extends RecordReader
:这是主要的工作。 特别是我把initialize
函数放在这里以获得更多插图。public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { System.out.println("initialize"); System.out.println(genericSplit.toString()); FileSplit split = (FileSplit) genericSplit; System.out.println("filesplit convertion has been done"); final Path file = split.getPath(); Configuration conf = context.getConfiguration(); conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); FileSystem fs = file.getFileSystem(conf); System.out.println("fs has been opened"); start = split.getStart(); end = start + split.getLength(); System.out.println("going to open split"); FSDataInputStream filein = fs.open(split.getPath()); System.out.println("going to load pdf"); PDDocument pd = PDDocument.load(filein); System.out.println("pdf has been loaded"); PDFTextStripper stripper = new PDFTextStripper(); in = new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes( "UTF-8"))); start = 0; this.pos = start; System.out.println("init has finished"); }
(您可以看到我的
system.out.println
用于调试。此方法在将genericSplit
转换为FileSplit
失败。我在控制台中看到的最后一件事是:hdfs://localhost:9000/in:0+9396432
这是
genericSplit.toString()
-
PDFInputFormat extends FileInputFormat
:它只是在createRecordReader
方法中创建new PDFRecordReader
。
我想知道我的错误是什么?
我需要额外的课程吗?
阅读PDF并不困难,您需要扩展类FileInputFormat以及RecordReader。 FileInputClass不能分割PDF文件,因为它们是二进制文件。
public class PDFInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new PDFLineRecordReader(); } // Do not allow to ever split PDF files, even if larger than HDFS block size @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
RecordReader然后执行读取(我使用PDFBox来读取PDF)。
public class PDFLineRecordReader extends RecordReader { private Text key = new Text(); private Text value = new Text(); private int currentLine = 0; private List lines = null; private PDDocument doc = null; private PDFTextStripper textStripper = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; final Path file = fileSplit.getPath(); Configuration conf = context.getConfiguration(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream filein = fs.open(fileSplit.getPath()); if (filein != null) { doc = PDDocument.load(filein); // Konnte das PDF gelesen werden? if (doc != null) { textStripper = new PDFTextStripper(); String text = textStripper.getText(doc); lines = Arrays.asList(text.split(System.lineSeparator())); currentLine = 0; } } } // False ends the reading process @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } if (currentLine < lines.size()) { String line = lines.get(currentLine); key.set(line); value.set(""); currentLine++; return true; } else { // All lines are read? -> end key = null; value = null; return false; } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return (100.0f / lines.size() * currentLine) / 100.0f; } @Override public void close() throws IOException { // If done close the doc if (doc != null) { doc.close(); } }
希望这可以帮助!
package com.sidd.hadoop.practice.pdf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sidd.hadoop.practice.input.pdf.PdfFileInputFormat; import com.sidd.hadoop.practice.output.pdf.PdfFileOutputFormat; public class ReadPdfFile { public static class MyMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // context.progress(); context.write(key, value); } } public static class MyReducer extends Reducer { public void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (values.iterator().hasNext()) { context.write(key, values.iterator().next()); } else { context.write(key, new Text("")); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Read Pdf"); job.setJarByClass(ReadPdfFile.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(PdfFileInputFormat.class); job.setOutputFormatClass(PdfFileOutputFormat.class); removeDir(args[1], conf); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static void removeDir(String path, Configuration conf) throws IOException { Path output_path = new Path(path); FileSystem fs = FileSystem.get(conf); if (fs.exists(output_path)) { fs.delete(output_path, true); } } }
- 从map中键入不匹配的值:expected org.apache.hadoop.io.NullWritable,recieved org.apache.hadoop.io.Text
- 从sql server导入数据的Sqoop作业忽略模式
- 如何将Jar文件传递给OOZIE shell节点中的shell脚本
- 在Hadoop中更改文件分割大小
- Hbase客户端ConnectionLoss for / hbase错误
- 从Java应用程序中执行Pig
- 从Java写入HDFS,“只能复制到0个节点而不是minReplication”
- “hadoop namenode -format”返回java.net.UnknownHostException
- 尝试格式化namenode时无法找到或加载主类; 在MAC OS X 10.9.2上安装hadoop