将1GB数据加载到hbase中需要1小时

我想将1GB(1000万条记录)CSV文件加载到Hbase中。 我为它写了Map-Reduce程序。 我的代码工作正常但需要1小时才能完成。 最后减速机耗时超过半小时。 有人可以帮帮我吗?

我的守则如下:

Driver.Java


    包com.cloudera.examples.hbase.bulkimport;  import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.KeyValue;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  / ** * HBase批量导入示例 
*数据准备MapReduce作业驱动*
    *
  1. args [0]:HDFS输入路径*
  2. args [1]:HDFS输出路径*
  3. args [2]:HBase表名*
* / public class Driver {public static void main(String [] args)throws Exception {Configuration conf = new Configuration(); / * * NBA Final 2010比赛1开球时间(从纪元开始的秒数)*星期四,03六月2010 18:00:00 PDT * / // conf.setInt(“epoch.seconds.tipoff”,1275613200); conf.set(“hbase.table.name”,args [2]); //加载hbase-site.xml HBaseConfiguration.addHbaseResources(conf); 工作职位=新工作(conf,“HBase批量导入示例”); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf,args [2]); //自动配置分区器和reducer HFileOutputFormat.configureIncrementalLoad(job,hTable); FileInputFormat.addInputPath(job,new Path(args [0])); FileOutputFormat.setOutputPath(job,new Path(args [1])); job.waitForCompletion(真); //将生成的HFile加载到表// LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args [1]),hTable); }}

HColumnEnum.java


        包com.cloudera.examples.hbase.bulkimport;

     / **
      *'srv'列族的HBase表列
      * /
     public enum HColumnEnum {
       SRV_COL_employeeid(“employeeid”.getBytes()),
       SRV_COL_eventdesc(“eventdesc”.getBytes()),
       SRV_COL_eventdate(“eventdate”.getBytes()),
       SRV_COL_objectname(“objectname”.getBytes()),
       SRV_COL_objectfolder(“objectfolder”.getBytes()),
       SRV_COL_ipaddress(“ipaddress”.getBytes());

       private final byte [] columnName;

       HColumnEnum(byte [] column){
         this.columnName = column;
       }

       public byte [] getColumnName(){
         return this.columnName;
       }
     }

HBaseKVMapper.java

 package com.cloudera.examples.hbase.bulkimport; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import au.com.bytecode.opencsv.CSVParser; /** * HBase bulk import example * 

* Parses Facebook and Twitter messages from CSV files and outputs * . *

* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it * into the correct HBase table region. *

* The KeyValue value holds the HBase mutation information (column family, * column, and value) */ public class HBaseKVMapper extends Mapper { final static byte[] SRV_COL_FAM = "srv".getBytes(); final static int NUM_FIELDS = 6; CSVParser csvParser = new CSVParser(); int tipOffSeconds = 0; String tableName = ""; // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss") // .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT")); ImmutableBytesWritable hKey = new ImmutableBytesWritable(); KeyValue kv; /** {@inheritDoc} */ @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration c = context.getConfiguration(); // tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0); tableName = c.get("hbase.table.name"); } /** {@inheritDoc} */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /*if (value.find("Service,Term,") > -1) { // Skip header return; }*/ String[] fields = null; try { fields = value.toString().split(","); //csvParser.parseLine(value.toString()); } catch (Exception ex) { context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1); return; } if (fields.length != NUM_FIELDS) { context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1); return; } // Get game offset in seconds from tip-off /* DateTime dt = null; try { dt = p.parseDateTime(fields[9]); } catch (Exception ex) { context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1); return; } int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds); String offsetForKey = String.format("%04d", gameOffset); String username = fields[2]; if (username.equals("")) { username = fields[3]; }*/ // Key: eg "1200:twitter:jrkinley" hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5]) .getBytes()); // Service columns if (!fields[0].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes()); context.write(hKey, kv); } if (!fields[1].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes()); context.write(hKey, kv); } if (!fields[2].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes()); context.write(hKey, kv); } if (!fields[3].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes()); context.write(hKey, kv); } if (!fields[4].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes()); context.write(hKey, kv); } if (!fields[5].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes()); context.write(hKey, kv); } context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1); /* * Output number of messages per quarter and before/after game. This should * correspond to the number of messages per region in HBase */ /* if (gameOffset < 0) { context.getCounter("QStats", "BEFORE_GAME").increment(1); } else if (gameOffset < 900) { context.getCounter("QStats", "Q1").increment(1); } else if (gameOffset < 1800) { context.getCounter("QStats", "Q2").increment(1); } else if (gameOffset < 2700) { context.getCounter("QStats", "Q3").increment(1); } else if (gameOffset < 3600) { context.getCounter("QStats", "Q4").increment(1); } else { context.getCounter("QStats", "AFTER_GAME").increment(1); }*/ } }

请帮助我提高性能,如果您有任何替代解决方案,请告诉我。

我的mapred-site.xml

       mapred.job.tracker namenode:54311   mapred.reduce.parallel.copies 20   tasktracker.http.threads 50   mapred.job.shuffle.input.buffer.percent 0.70   mapred.tasktracker.map.tasks.maximum 4   mapred.tasktracker.reduce.tasks.maximum 4   mapred.map.tasks 4   reduce.map.tasks 4   mapred.job.shuffle.merge.percent 0.65   mapred.task.timeout 1200000   mapred.child.java.opts -Xms1024M -Xmx2048M   mapred.job.reuse.jvm.num.tasks -1   mapred.compress.map.output true   mapred.map.output.compression.codec com.hadoop.compression.lzo.LzoCodec   io.sort.mb 800   mapred.child.ulimit unlimited   io.sort.factor 100 More streams merged at once while sorting files.   mapreduce.admin.map.child.java.opts -Djava.net.preferIPv4Stack=true   mapreduce.admin.reduce.child.java.opts -Djava.net.preferIPv4Stack=true   mapred.min.split.size 0   mapred.job.map.memory.mb -1   mapred.jobtracker.maxtasks.per.job -1   

HBase的-site.xml中

     hbase.rootdir hdfs://namenode:54310/hbase The directory shared by RegionServers.    hbase.master slave:60000 The host and port that the HBase master runs at. A value of 'local' runs the master and a regionserver in a single process.    hbase.cluster.distributed true The mode the cluster will be in. Possible values are false: standalone and pseudo-distributed setups with managed Zookeeper true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)    hbase.zookeeper.quorum slave Comma separated list of servers in the ZooKeeper Quorum. For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". By default this is set to localhost for local and pseudo-distributed modes of operation. For a fully-distributed setup, this should be set to a full list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh this is the list of servers which we will start/stop ZooKeeper on.    hbase.zookeeper.property.clientPort 2181   hbase.zookeeper.property.dataDir /home/hduser/work/zoo_data Property from ZooKeeper's config zoo.cfg. The directory where the snapshot is stored.    

请帮助我,这样我可以提高我的表现。

首先,为什么我们需要Mapreduce程序将数据加载到Hbase中以获得如此小的文件(1GB)。

根据我的经验,我使用Jackson流处理5GB Json(我不想将所有json都记入内存)并在8分钟内通过使用批处理技术持久保存在Hbase中。

我在批量列表对象100000记录中使用了hbase put。

下面是我实现此目的的代码片段。 解析其他格式也可以做同样的事情)

可能是你需要在2个地方调用这个方法

1)批量为100000条记录。

2)处理提醒您的批记录小于100000

  public void addRecord(final ArrayList puts, final String tableName) throws Exception { try { final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName)); table.put(puts); LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK."); } catch (final Throwable e) { e.printStackTrace(); } finally { LOG.info("Processed ---> " + puts.size()); if (puts != null) { puts.clear(); } } } 

我只创建了mapper类并采用了hbase输出格式类。 现在它需要10分钟。 我的网络速度非常慢,这就是为什么它需要很长时间。

可以通过指定创建Hbase表时要使用的Region拆分的数量来进一步微调。 由于批量加载的reducer实例数量也将取决于Regions的数量。 这可以使用以下命令完成

 hbase org.apache.hadoop.hbase.util.RegionSplitter -c  -f    

对于拆分算法,可以指定

  • UniformSplit – 将键视为任意字节

  • HexStringSplit – 将键视为hexASCII