如何使用spark处理一系列hbase行?
我正在尝试使用HBase作为spark的数据源。 因此,第一步是从HBase表创建RDD。 由于Spark使用hadoop输入格式,我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25 / lighting-a-spark-with-hbase但我们如何为范围扫描创建RDD?
欢迎所有建议。
以下是使用Spark in Spark的示例:
import java.io.{DataOutputStream, ByteArrayOutputStream} import java.lang.String import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Base64 def convertScanToString(scan: Scan): String = { val out: ByteArrayOutputStream = new ByteArrayOutputStream val dos: DataOutputStream = new DataOutputStream(out) scan.write(dos) Base64.encodeBytes(out.toByteArray) } val conf = HBaseConfiguration.create() val scan = new Scan() scan.setCaching(500) scan.setCacheBlocks(false) conf.set(TableInputFormat.INPUT_TABLE, "table_name") conf.set(TableInputFormat.SCAN, convertScanToString(scan)) val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rdd.count
您需要将相关库添加到Spark类路径,并确保它们与您的Spark兼容。 提示:您可以使用hbase classpath
来查找它们。
你可以设置下面的conf
val conf = HBaseConfiguration.create()//need to set all param for habse conf.set(TableInputFormat.SCAN_ROW_START, "row2"); conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
这将仅为那些reocrds加载rdd
以下是使用TableMapReduceUtil.convertScanToString(Scan scan):
的Java示例TableMapReduceUtil.convertScanToString(Scan scan):
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; public class HbaseScan { public static void main(String ... args) throws IOException, InterruptedException { // Spark conf SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Hbase conf Configuration conf = HBaseConfiguration.create(); conf.set(TableInputFormat.INPUT_TABLE, "big_table_name"); // Create scan Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.setStartRow(Bytes.toBytes("a")); scan.setStopRow(Bytes.toBytes("d")); // Submit scan into hbase conf conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan)); // Get RDD JavaPairRDD source = jsc .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Process RDD System.out.println(source.count()); } }
- 在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?
- 为什么Apache Spark在客户端上执行filter
- Spark 1.6-无法在hadoop二进制路径中找到winutils二进制文件
- Spark – Java UDF返回多列
- 如何使用Java中的spark在Dataframe中用特定值替换空值?
- Spark 2.0.1写入错误:引起:java.util.NoSuchElementException
- 在google dataproc集群实例中的spark-submit上运行app jar文件
- Java中的“Lambdifying”scala函数
- 使用sc.textFile以递归方式从子目录中获取文件内容