扩展Hadoop的TableInputFormat以使用用于分发时间戳键的前缀进行扫描

我有一个hbase表,其密钥是一个带有一个字节随机前缀的时间戳,用于分发密钥,因此扫描不会热点。 我正在尝试扩展TableInputFormat以便我可以在带有范围的表上运行单个MapReduce,为所有256个可能的前缀添加前缀,以便扫描具有指定时间戳范围的所有范围。 我的解决方案不起作用,因为它总是似乎扫描最后一个前缀(127)256次。 必须在所有扫描中共享某些内容。

我的代码如下。 有任何想法吗?

 public class PrefixedTableInputFormat extends TableInputFormat { @Override public List getSplits(JobContext context) throws IOException { List splits = new ArrayList(); Scan scan = getScan(); byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow(); byte prefixedStartRow[] = new byte[startRow.length+1]; byte prefixedStopRow[] = new byte[stopRow.length+1]; System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length); System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length); for (int prefix = -128; prefix < 128; prefix++) { prefixedStartRow[0] = (byte) prefix; prefixedStopRow[0] = (byte) prefix; scan.setStartRow(prefixedStartRow); scan.setStopRow(prefixedStopRow); setScan(scan); splits.addAll(super.getSplits(context)); } return splits; } } 

  Configuration config = HBaseConfiguration.create(); Job job = new Job(config, "Aggregate"); job.setJarByClass(Aggregate.class); Scan scan = new Scan(); scan.setStartRow("20120630".getBytes()); scan.setStopRow("20120701".getBytes()); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( "event", scan, Mapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, PrefixedTableInputFormat.class); TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job); 

您将需要在每次迭代中制作分割的深层副本:

 for (int prefix = -128; prefix < 128; prefix++) { prefixedStartRow[0] = (byte) prefix; prefixedStopRow[0] = (byte) prefix; scan.setStartRow(prefixedStartRow); scan.setStopRow(prefixedStopRow); setScan(scan); for (InputSplit subSplit : super.getSplits(context)) { splits.add((InputSplit) ReflectionUtils.copy(conf, (TableSplit) subSplit, new TableSplit()); } }