Hadoop – 直接从Mapper写入HBase

我有一个haddop作业,它的输出应写入HBase。 我真的不需要reducer,我想插入的那种行在Mapper中确定。

我如何使用TableOutputFormat来实现这一目标? 从我看到的所有示例中,我们假设reducer是创建Put的那个,而TableMapper只是用于从HBase表中读取。

在我的情况下,输入是HDFS,输出是Put到特定的表,我在TableMapReduceUtil中找不到任何可以帮助我的东西。

有什么例子可以帮助我吗?

顺便说一下,我正在使用新的Hadoop API

这是从文件读取并将所有行放入Hbase的示例。 此示例来自“Hbase:权威指南”,您可以在存储库中找到它。 要在计算机上克隆回购:

git clone git://github.com/larsgeorge/hbase-book.git 

在本书中,您还可以找到有关代码的所有解释。 但如果你有些不可理解的事情,请随意提问。

 ` public class ImportFromFile { public static final String NAME = "ImportFromFile"; public enum Counters { LINES } static class ImportMapper extends Mapper { private byte[] family = null; private byte[] qualifier = null; @Override protected void setup(Context context) throws IOException, InterruptedException { String column = context.getConfiguration().get("conf.column"); byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); family = colkey[0]; if (colkey.length > 1) { qualifier = colkey[1]; } } @Override public void map(LongWritable offset, Text line, Context context) throws IOException { try { String lineString = line.toString(); byte[] rowkey = DigestUtils.md5(lineString); Put put = new Put(rowkey); put.add(family, qualifier, Bytes.toBytes(lineString)); context.write(new ImmutableBytesWritable(rowkey), put); context.getCounter(Counters.LINES).increment(1); } catch (Exception e) { e.printStackTrace(); } } } private static CommandLine parseArgs(String[] args) throws ParseException { Options options = new Options(); Option o = new Option("t", "table", true, "table to import into (must exist)"); o.setArgName("table-name"); o.setRequired(true); options.addOption(o); o = new Option("c", "column", true, "column to store row data into (must exist)"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("i", "input", true, "the directory or file to read from"); o.setArgName("path-in-HDFS"); o.setRequired(true); options.addOption(o); options.addOption("d", "debug", false, "switch on DEBUG log level"); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); conf.set("conf.column", column); Job job = new Job(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }` 

您只需要使映射器输出该对。 OutputFormat仅指定如何持久保存输出键值。 它并不一定意味着关键值来自减速器。 你需要在mapper中做这样的事情:

 ... extends TableMapper() { ... ... context.write(, ); }