从Storm bolt中将行插入HBase

我希望能够从分布式(非本地)Storm拓扑中将新条目写入HBase。 存在一些GitHub项目,它们提供HBase Mappers或预先制作的Storm bolt来将元组写入HBase。 这些项目提供了在LocalCluster上执行样本的说明。

我遇到这两个项目并直接从bolt中访问HBase API的问题是它们都需要将HBase-site.xml文件包含在类路径中。 使用直接API方法,也可能使用GitHub方法,当您执行HBaseConfiguration.create(); 它将尝试从类路径上的条目中查找所需的信息。

如何修改storm bolt的类路径以包含Hbase配置文件?

更新:使用danehammer的答案,这就是我的工作方式

将以下文件复制到〜/ .storm目录中:

  • HBase的-共0.98.0.2.1.2.0-402-hadoop2.jar
  • HBase的-site.xml中
  • storm.yaml:注意:如果你没有将storm.yaml复制到该目录中,那么storm jar命令将不会在类路径中使用该目录(请参阅storm.py python脚本以查看自己的逻辑 – 如果这被记录在案)

接下来,在拓扑类的main方法中获取HBase配置并对其进行序列化:

 final Configuration hbaseConfig = HBaseConfiguration.create(); final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer(); hbaseConfig.write(databufHbaseConfig); final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData(); 

通过构造函数将字节数组传递给spout类。 spout类将此字节数组保存到字段中(不要在构造函数中反序列化。我发现如果spout有一个Configuration字段,你将在运行拓扑时得到一个无法序列化的exception)

在spout的open方法中,反序列化配置并访问hbase表:

 Configuration hBaseConfiguration = new Configuration(); ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized); hBaseConfiguration.readFields(new DataInputStream(bas)); HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN")); scnrTbl = tbl.getScanner(scan); 

现在,在您的nextTuple方法中,您可以使用扫描程序获取下一行:

 Result rsltWaveform = scnrWaveformTbl.next(); 

从结果中提取您想要的内容,并将某些可序列化对象中的值传递给螺栓。

使用“storm jar”命令部署拓扑时, ~/.storm文件夹将位于类路径上(请参阅jar命令下的此链接 )。 如果您将hbase-site.xml文件(或相关的* -site.xml文件)放在该文件夹中, HBaseConfiguration.create()在“storm jar”期间HBaseConfiguration.create()将找到该文件并正确返回org.apache.hadoop.configuration.Configuration 。 这需要在拓扑中存储和序列化,以便在集群中分发该配置。