Tag: apache storm

我的风暴拓扑既不工作(不生成输出)也不失败(不生成错误或exception)

我有一个拓扑,我试图计算由SimulatorSpout(不是真正的流)生成的单词出现,并在写入MySQL数据库表之后,表格方案非常简单: Field | Type | … ID | int(11) | Auto_icr word | varchar(50) | count | int(11) | 但是我面临着奇怪的问题(正如我之前提到的)我成功地将The Topology提交给我的Storm Cluster,它由4个主管组成,我可以在Storm Web UI中看到拓扑的流程(没有例外),但是当我检查MySQL表时令我惊讶的是,桌子是空的…… 任何评论,建议都是欢迎…… 这是喷口和螺栓: public class MySQLConnection { private static Connection conn = null; private static String dbUrl = “jdbc:mysql://192.168.0.2:3306/test?”; private static String dbClass = “com.mysql.jdbc.Driver”; public static Connection getConnection() throws SQLException, ClassNotFoundException […]

如何通过eclipse在本地群集/模式下调试Apache Storm

使用以下问答环节,我设法通过Apache Storm集群(在本地运行)上的eclipse启用调试。 如何在Eclipse中调试Apache Storm? 我的conf/storm.yaml有以下行来启用工作节点上的调试: worker.childopts: “-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=y” 当我提交一个拓扑到风暴运行(在一个集群中)时,我可以在我的编辑器中设置断点和查看变量。 但是当我尝试在本地运行它时(在本地模式下 ),我似乎无法连接(连接拒绝) – 通过eclipse。 # I’m using storm crawler, I submit a topology like so: storm jar target/storm-crawler-core-10.6-SNAPSHOT-jar-with-dependencies.jar \ com.digitalpebble.storm.crawler.CrawlTopology \ -conf crawler-conf.yaml \ -local # ^ The `-local` runs it in a `LocalCluster` # If I submit it to my actual cluster (without -local), I can […]

Apache Storm远程拓扑提交

我一直在使用IDE(Eclipse)测试Storm Topologies的远程提交。 我成功地将简单的风暴拓扑上传到远程Storm集群,但奇怪的是当我检查Storm UI以确定远程提交的拓扑是否正常工作时,我看到UI中只有_acker bolt而其他螺栓和鲸鱼喷水不存在。 之后,我从命令行手动提交拓扑,并再次检查Storm UI,它正在工作,因为它应该没有问题。 我一直在找原因但找不到。 我在下面附加了拓扑和远程提交者类以及相应的Storm UI图片: 这是Eclipse控制台的输出(远程提交后) 225 [main] INFO backtype.storm.StormSubmitter – Uploading topology jar T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 234 [main] INFO backtype.storm.StormSubmitter – Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 这是拓扑: public class StormBenchmark { // ****************************************************************************************** public static class GenSpout extends BaseRichSpout { //private static final Logger […]

无法从Storm教程中运行风暴启动器拓扑

在遵循storm-starter repo中的指南时,我无法实际运行任何拓扑 – 如ExclamationTopology 。 mvn clean install -DskipTests=true成功运行,从顶级Storm repo执行,与storm-examples级别的mvn package 。 当我尝试运行storm jar target/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ExclamationTopology ,我收到错误消息: Error: A JNI error has occurred, please check your installation and try again Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout 我正在运行OS X,Java版本: java version “1.8.0_45” Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed […]

从Storm bolt中将行插入HBase

我希望能够从分布式(非本地)Storm拓扑中将新条目写入HBase。 存在一些GitHub项目,它们提供HBase Mappers或预先制作的Storm bolt来将元组写入HBase。 这些项目提供了在LocalCluster上执行样本的说明。 我遇到这两个项目并直接从bolt中访问HBase API的问题是它们都需要将HBase-site.xml文件包含在类路径中。 使用直接API方法,也可能使用GitHub方法,当您执行HBaseConfiguration.create(); 它将尝试从类路径上的条目中查找所需的信息。 如何修改storm bolt的类路径以包含Hbase配置文件? 更新:使用danehammer的答案,这就是我的工作方式 将以下文件复制到〜/ .storm目录中: HBase的-共0.98.0.2.1.2.0-402-hadoop2.jar HBase的-site.xml中 storm.yaml:注意:如果你没有将storm.yaml复制到该目录中,那么storm jar命令将不会在类路径中使用该目录(请参阅storm.py python脚本以查看自己的逻辑 – 如果这被记录在案) 接下来,在拓扑类的main方法中获取HBase配置并对其进行序列化: final Configuration hbaseConfig = HBaseConfiguration.create(); final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer(); hbaseConfig.write(databufHbaseConfig); final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData(); 通过构造函数将字节数组传递给spout类。 spout类将此字节数组保存到字段中(不要在构造函数中反序列化。我发现如果spout有一个Configuration字段,你将在运行拓扑时得到一个无法序列化的exception) 在spout的open方法中,反序列化配置并访问hbase表: Configuration hBaseConfiguration = new Configuration(); ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized); hBaseConfiguration.readFields(new DataInputStream(bas)); HTable […]

Apache Storm螺栓从不同的喷口/螺栓接收多个输入元组

螺栓是否可以从不同的喷口/螺栓接收多个输入元组? 例如,Bolt C从Spout A接收输入元组,从Bolt B接收要处理的元组。 我该如何实施呢? 我的意思是编写Bolt C的Java代码及其拓扑。

Apache Storm Trident和Kafka Spout Integration

我无法找到正确整合Kafka与Apache Storm Trident的良好文档。 我试着在这里查看以前发布的相关问题,但没有足够的信息。 我想将Trident与Kafka连接为OpaqueTridentKafkaSpout。 以下是目前正在运行的示例代码 GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(properties.getProperty(“topic”, “mytopic”)); Broker brokerForPartition0 = new Broker(“IP1”,9092); Broker brokerForPartition1 = new Broker(“IP2”, 9092); Broker brokerForPartition2 = new Broker(“IP3:9092”); globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 StaticHosts staticHosts = new StaticHosts(globalPartitionInformation); TridentKafkaConfig […]

Storm Cluster重复元组

目前我正在开发一个项目,我在四个Unix主机上设置了一个Storm集群。 拓扑本身如下: JMS Spout侦听MQ以获取新消息 JMS Spout解析然后将结果发送到Esper Bolt Esper Bolt然后处理事件并将结果发送到JMS Bolt 然后,JMS Bolt将消息发布回MQ上的另一个主题 我意识到Storm是一个“至少一次”的框架。 但是,如果我收到5个事件并将这些事件传递给Esper Bolt进行计数,那么由于某种原因,我在JMS Bolt中获得了5个计数结果(所有相同的值)。 理想情况下,我想收到一个结果输出,有什么方法我可以告诉Storm忽略重复的元组? 我认为这与我设置的并行性有关,因为当我只有一个线程时它会按预期工作: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2); builder.setBolt(“esperBolt”, new EsperBolt.Builder().build(),6).setNumTasks(6) .fieldsGrouping(JMS_DATA_SPOUT,new Fields(“eventGrouping”)); builder.setBolt(“jmsBolt”, new JMSBolt(),2).setNumTasks(2).fieldsGrouping(“esperBolt”, new Fields(“eventName”)); 我也看过Trident的“一次性”语义。 但我并不完全相信这会解决这个问题。

在Storm TrackedTopologyunit testing中运行Trident拓扑

如何运行Trident拓扑的JUnit测试,以便在测试和validation每个阶段的输出时允许元组流经拓扑? 我已经尝试在Storm的测试框架内运行,但它不能实现Trident的validation和一致执行。 这是一个示例拓扑,其中包含一些内联注释,其中我遇到的问题最多。 import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.List; import org.junit.Test; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.builtin.Count; import storm.trident.testing.MemoryMapState; import storm.trident.testing.Split; import backtype.storm.Config; import backtype.storm.ILocalCluster; import backtype.storm.Testing; import backtype.storm.testing.FeederSpout; import backtype.storm.testing.TestJob; import backtype.storm.testing.TrackedTopology; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; public class WordCountTopologyTest { @Test public void testWordCountTopology() throws Exception { Testing.withTrackedCluster(new WordCountTestJob()); } public class WordCountTestJob […]

风暴拓扑不提交

我配置了我的机器zookeeper,nimbus,supervisor正常运行,我的拓扑在LocalCluster中工作 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“SendPost”, conf, builder.createTopology()); Utils.sleep(10000000000l); cluster.killTopology(“SendPost”); cluster.shutdown(); 现在我想尝试提交我的拓扑结构但它不起作用 /usr/local/storm/bin$ ./storm jar /home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.winoria.post.PostTopology Post 我得到了以下错误 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/storm/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. Running: java -client -Dstorm.options= -Dstorm.home=/usr/local/storm -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local /storm/storm-netty-0.9.0.1.jar:/usr/local/storm/storm-console-logging-0.9.0.1.jar:/usr/local/storm/storm-core-0.9.0.1.jar:/usr/local/storm/lib/httpcore-4.1.jar:/usr/local/storm/lib/carbonite-1.5.0.jar:/usr/local/storm/lib/mockito-all-1.9.5.jar:/usr/local/storm/lib/commons-io-1.4.jar:/usr/local/storm/lib/commons-fileupload-1.2.1.jar:/usr/local/storm/lib/jgrapht-0.8.3.jar:/usr/local/storm/lib/ring-jetty-adapter-0.3.11.jar:/usr/local/storm/lib/jzmq-2.1.0.jar:/usr/local/storm/lib/asm-4.0.jar:/usr/local/storm/lib/logback-core-1.0.6.jar:/usr/local/storm/lib/tools.nrepl-0.2.3.jar:/usr/local/storm/lib/compojure-1.1.3.jar:/usr/local/storm/lib/json-simple-1.1.jar:/usr/local/storm/lib/ring-devel-0.3.11.jar:/usr/local/storm/lib/commons-logging-1.1.1.jar:/usr/local/storm/lib/httpclient-4.1.1.jar:/usr/local/storm/lib/reflectasm-1.07-shaded.jar:/usr/local/storm/lib/commons-exec-1.1.jar:/usr/local/storm/lib/guava-13.0.jar:/usr/local/storm/lib/clout-1.0.1.jar:/usr/local/storm/lib/objenesis-1.2.jar:/usr/local/storm/lib/slf4j-api-1.6.5.jar:/usr/local/storm/lib/clojure-1.4.0.jar:/usr/local/storm/lib/jetty-6.1.26.jar:/usr/local/storm/lib/hiccup-0.3.6.jar:/usr/local/storm/lib/clj-stacktrace-0.2.2.jar:/usr/local/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/storm/lib/tools.logging-0.2.3.jar:/usr/local/storm/lib/ring-core-1.1.5.jar:/usr/local/storm/lib/zookeeper-3.3.3.jar:/usr/local/storm/lib/math.numeric-tower-0.0.1.jar:/usr/local/storm/lib/disruptor-2.10.1.jar:/usr/local/storm/lib/minlog-1.2.jar:/usr/local/storm/lib/core.incubator-0.1.0.jar:/usr/local/storm/lib/servlet-api-2.5-20081211.jar:/usr/local/storm/lib/netty-3.6.3.Final.jar:/usr/local/storm/lib/ring-servlet-0.3.11.jar:/usr/local/storm/lib/clj-time-0.4.1.jar:/usr/local/storm/lib/snakeyaml-1.11.jar:/usr/local/storm/lib/commons-codec-1.4.jar:/usr/local/storm/lib/tools.cli-0.2.2.jar:/usr/local/storm/lib/logback-classic-1.0.6.jar:/usr/local/storm/lib/servlet-api-2.5.jar:/usr/local/storm/lib/kryo-2.17.jar:/usr/local/storm/lib/joda-time-2.0.jar:/usr/local/storm/lib/curator-client-1.0.1.jar:/usr/local/storm/lib/libthrift7-0.7.0-2.jar:/usr/local/storm/lib/tools.macro-0.1.0.jar:/usr/local/storm/lib/jline-0.9.94.jar:/usr/local/storm/lib/clojure-complete-0.2.3.jar:/usr/local/storm/lib/curator-framework-1.0.1.jar:/usr/local/storm/lib/commons-lang-2.5.jar:/usr/local/storm/lib/junit-3.8.1.jar:/usr/local/storm/lib/jetty-util-6.1.26.jar:/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/usr/local/storm/conf:/usr/local/storm/bin -Dstorm.jar=/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.winoria.post.PostTopology […]