以编程方式将数据批量加载到HBase的最快方法是什么?

我有一个纯文本文件,可能有数百万行需要自定义解析,我想尽快加载到HBase表中(使用Hadoop或HBase Java客户端)。

我目前的解决方案是基于没有Reduce部分的MapReduce作业。 我使用FileInputFormat读取文本文件,以便将每一行传递给Mapper类的map方法。 此时,解析该行以形成写入contextPut对象。 然后, TableOutputFormat获取Put对象并将其插入表中。

该解决方案产生的平均插入速率为每秒1,000行,低于我的预期。 我的HBase设置在单个服务器上处于伪分布式模式。

一个有趣的事情是,在插入1,000,000行时,会产生25个Mappers(任务),但它们会连续运行(一个接一个); 这是正常的吗?

这是我当前解决方案的代码:

 public static class CustomMap extends Mapper { protected void map(LongWritable key, Text value, Context context) throws IOException { Map parsedLine = parseLine(value.toString()); Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); for (String currentKey : parsedLine.keySet()) { row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); } try { context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(String[] args) throws Exception { if (args.length != 2) { return -1; } conf.set("hbase.mapred.outputtable", args[1]); // I got these conf parameters from a presentation about Bulk Load conf.set("hbase.hstore.blockingStoreFiles", "25"); conf.set("hbase.hregion.memstore.block.multiplier", "8"); conf.set("hbase.regionserver.handler.count", "30"); conf.set("hbase.regions.percheckin", "30"); conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); Job job = new Job(conf); job.setJarByClass(BulkLoadMapReduce.class); job.setJobName(NAME); TextInputFormat.setInputPaths(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(CustomMap.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println("Start time : " + startTime); int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); Long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println("End time : " + endTime); System.out.println("Duration milliseconds: " + (endTime-startTime)); System.exit(errCode); } 

我经历了一个与你的过程非常相似的过程,试图找到一种有效的方法将数据从MR加载到HBase中。 我发现工作的是使用HFileOutputFormat作为MR的OutputFormatClass。

下面是我必须生成job代码的基础,以及写出数据的Mapper map函数。 这很快。 我们不再使用它了,所以我手边没有数字,但在一分钟之内就有大约250万条记录。

这是我编写的(剥离)函数,用于为我的MapReduce进程生成作业以将数据放入HBase

 private Job createCubeJob(...) { //Build and Configure Job Job job = new Job(conf); job.setJobName(jobName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper job.setJarByClass(CubeBuilderDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); TextInputFormat.setInputPaths(job, hiveOutputDir); HFileOutputFormat.setOutputPath(job, cubeOutputPath); Configuration hConf = HBaseConfiguration.create(conf); hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); HTable hTable = new HTable(hConf, tableName); HFileOutputFormat.configureIncrementalLoad(job, hTable); return job; } 

这是我在HiveToHBaseMapper类中的map函数(稍加编辑)。

 public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException { try{ Configuration config = context.getConfiguration(); String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); String column = strs[COLUMN_INDEX]; String Value = strs[VALUE_INDEX]; String sKey = generateKey(strs, config); byte[] bKey = Bytes.toBytes(sKey); Put put = new Put(bKey); put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) ? Bytes.toBytes(Double.MIN_VALUE) : Bytes.toBytes(value)); ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); context.write(ibKey, put); context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); } catch(Exception e){ context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1); } } 

我很确定这不会是一个复制和粘贴解决方案。 显然,我在这里使用的数据不需要任何自定义处理(在此之前在MR作业中完成)。 我想提供的主要是HFileOutputFormat 。 其余的只是我如何使用它的一个例子。 🙂
我希望它能让你走上一条通向良好解决方案的坚实道路。 :

一个有趣的事情是,在插入1,000,000行时,会产生25个Mappers(任务),但它们会连续运行(一个接一个); 这是正常的吗?

mapreduce.tasktracker.map.tasks.maximum参数默认为2确定可以在节点上并行运行的最大任务数。 除非更改,否则您应该看到每个节点上同时运行2个map任务。