SPARK到HBase写作
我的SPARK计划的流程如下:
驱动程序 – >创建Hbase连接 – >广播Hbase句柄现在从执行程序,我们获取此句柄并尝试写入hbase
在Driver程序中,我正在创建HBase conf对象和Connection Object,然后通过JavaSPARK Context广播它,如下所示:
SparkConf sparkConf = JobConfigHelper.getSparkConfig(); Configuration conf = new Configuration(); UserGroupInformation.setConfiguration(conf); jsc = new JavaStreamingContext(sparkConf, Durations.milliseconds(Long.parseLong(batchDuration))); Configuration hconf=HBaseConfiguration.create(); hconf.addResource(new Path("/etc/hbase/conf/core-site.xml")); hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); UserGroupInformation.setConfiguration(hconf); JavaSparkContext js = jsc.sparkContext(); Connection connection = ConnectionFactory.createConnection(hconf); connectionbroadcast=js.broadcast(connection);
执行器的内部call()方法,
Table table = connectionbroadcast.getValue().getTable(TableName.valueOf("gfttsdgn:FRESHHBaseRushi")) ; Put p = new Put(Bytes.toBytes("row1")); p.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan")); table.put(p);
尝试在yarn-client模式下运行时获得以下exception:
17/03/02 09:19:38 ERROR yarn.ApplicationMaster: User class threw exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classLoader (org.apache.hadoop.conf.Configuration) conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classLoader (org.apache.hadoop.conf.Configuration) conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1337) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639) at com.citi.fresh.core.driver.FreshDriver.main(FreshDriver.java:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) Caused by: java.util.ConcurrentModificationException at java.util.Vector$Itr.checkForComodification(Vector.java:1156) at java.util.Vector$Itr.next(Vector.java:1133) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) ... 28 more
我可以看到你正在尝试使用Spark将数据批量放入HBase。 正如@jojo_Berlin所解释的那样,您的Hbase Conf不是线程安全的。 但是,您可以使用SparkOnHbase轻松实现此目的 。
Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.bulkPut(rdd, TableName.valueOf("gfttsdgn:FRESHHBaseRushi"), new PutFunction(), true);
你的’put’function在哪里:
public static class PutFunction implements Function { public Put call(String v) throws Exception { Put put = new Put(Bytes.toBytes(v)); put.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan")); return put; } }
- 使用Java的Spark作业服务器
- Spark spark-submit –jars参数需要逗号列表,如何声明jar的目录?
- 是否可以在Apache Spark中创建嵌套的RDD?
- Spark流mapWithState超时延迟了吗?
- 在火花环境中的Uima Ruta Out of Memory问题
- Spark 2.0.0 Arrays.asList无法正常工作 – 不兼容的类型
- Spark 2.0.1写入错误:引起:java.util.NoSuchElementException
- 在Apache spark中,使用mapPartitions和组合使用广播变量和map之间的区别是什么
- 如何读取嵌套的JSON以进行聚合?