如何在map reduce程序中解析PDF文件?

我想在我的hadoop 2.2.0程序中解析PDF文件,我发现了这个 ,按照它的说法,直到现在,我有这三个类:

  1. PDFWordCount包含map和reduce函数的主类。 (就像本机hadoop wordcount示例,但我使用的是PDFInputFormat类而不是TextInputFormat
  2. PDFRecordReader extends RecordReader这是主要的工作。 特别是我把initialize函数放在这里以获得更多插图。

     public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { System.out.println("initialize"); System.out.println(genericSplit.toString()); FileSplit split = (FileSplit) genericSplit; System.out.println("filesplit convertion has been done"); final Path file = split.getPath(); Configuration conf = context.getConfiguration(); conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); FileSystem fs = file.getFileSystem(conf); System.out.println("fs has been opened"); start = split.getStart(); end = start + split.getLength(); System.out.println("going to open split"); FSDataInputStream filein = fs.open(split.getPath()); System.out.println("going to load pdf"); PDDocument pd = PDDocument.load(filein); System.out.println("pdf has been loaded"); PDFTextStripper stripper = new PDFTextStripper(); in = new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes( "UTF-8"))); start = 0; this.pos = start; System.out.println("init has finished"); } 

    (您可以看到我的system.out.println用于调试。此方法在将genericSplit转换为FileSplit失败。我在控制台中看到的最后一件事是:

     hdfs://localhost:9000/in:0+9396432 

    这是genericSplit.toString()

  3. PDFInputFormat extends FileInputFormat它只是在createRecordReader方法中创建new PDFRecordReader

我想知道我的错误是什么?

我需要额外的课程吗?

阅读PDF并不困难,您需要扩展类FileInputFormat以及RecordReader。 FileInputClass不能分割PDF文件,因为它们是二进制文件。

 public class PDFInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new PDFLineRecordReader(); } // Do not allow to ever split PDF files, even if larger than HDFS block size @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } } 

RecordReader然后执行读取(我使用PDFBox来读取PDF)。

 public class PDFLineRecordReader extends RecordReader { private Text key = new Text(); private Text value = new Text(); private int currentLine = 0; private List lines = null; private PDDocument doc = null; private PDFTextStripper textStripper = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; final Path file = fileSplit.getPath(); Configuration conf = context.getConfiguration(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream filein = fs.open(fileSplit.getPath()); if (filein != null) { doc = PDDocument.load(filein); // Konnte das PDF gelesen werden? if (doc != null) { textStripper = new PDFTextStripper(); String text = textStripper.getText(doc); lines = Arrays.asList(text.split(System.lineSeparator())); currentLine = 0; } } } // False ends the reading process @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } if (currentLine < lines.size()) { String line = lines.get(currentLine); key.set(line); value.set(""); currentLine++; return true; } else { // All lines are read? -> end key = null; value = null; return false; } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return (100.0f / lines.size() * currentLine) / 100.0f; } @Override public void close() throws IOException { // If done close the doc if (doc != null) { doc.close(); } } 

希望这可以帮助!

 package com.sidd.hadoop.practice.pdf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sidd.hadoop.practice.input.pdf.PdfFileInputFormat; import com.sidd.hadoop.practice.output.pdf.PdfFileOutputFormat; public class ReadPdfFile { public static class MyMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // context.progress(); context.write(key, value); } } public static class MyReducer extends Reducer { public void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (values.iterator().hasNext()) { context.write(key, values.iterator().next()); } else { context.write(key, new Text("")); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Read Pdf"); job.setJarByClass(ReadPdfFile.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(PdfFileInputFormat.class); job.setOutputFormatClass(PdfFileOutputFormat.class); removeDir(args[1], conf); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static void removeDir(String path, Configuration conf) throws IOException { Path output_path = new Path(path); FileSystem fs = FileSystem.get(conf); if (fs.exists(output_path)) { fs.delete(output_path, true); } } }