Tag: apache storm

Storm KafkaSpout停止使用来自Kafka Topic的消息

我的问题是Storm KafkaSpout在一段时间后停止使用来自Kafka主题的消息。 在storm中启用调试时,我得到如下日志文件: 2016-07-05 03:58:26.097 oasdtask [INFO] Emitting:packet_spout __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34f“org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo @ 2c35b34f“] [#object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35”[__ack-count = {default = 0}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x230867ec“[__sendqueue = {sojourn_time_ms = 0.0,write_pos = 5411461,read_pos = 5411461,overflow = 0,arrival_rate_secs = 0.0,capacity = 1024,population = 0}]”] #object [org.apache.storm.metric。 api.IMetricsConsumer $ DataPoint 0x7cdec8eb“[__ complete-latency […]

如何使用Log4j和Storm Framework将日志写入文件?

我在使用log4j在风暴中记录到文件时遇到了一些问题。 在提交我的拓扑之前,即在我的main方法中,我编写了一些日志语句并使用以下命令配置了记录器: PropertyConfigurator.configure(myLog4jProperties) 现在,当我在eclipse中使用我的可执行jar运行我的拓扑时 – 它正常工作并且正在创建日志文件。 要么 当我使用“java -jar MyJarFile someOtherOptions”运行我的可执行jar时,我可以看到log4j正在配置并且文件正确形成并且在两个文件和控制台上都进行了日志记录(在我的log4j.properties中定义) 但是,当我使用“风暴jar MyJarFile MyMainClass someOtherOptions”运行相同的jar时,它无法创建并登录到除控制台之外的任何文件。 我正在谈论我在提交拓扑之前打印的日志。 有没有办法在使用storm时将我的语句记录在文件中? 我不一定要使用org.apache.log4j 。

风暴重播元组哪个处理已超时?

在风暴文档中提到,风暴重放了处理已经超时的元组。 我的问题是风暴是否自动执行此操作(不调用原始spout上的fail())或者是原始spout的重要责任是重放元组(调用fail()并且重放应该在内部实现,甚至在外部实现)?

如何从Storm中禁用/关闭日志记录function

我想在从本地群集运行时关闭默认提供的日志记录function。 目前它在控制台上记录了很多信息。 以下是日志示例: 261 [main] INFO backtype.storm.daemon.task – Shut down task Getting-Started-Toplogie-1-1376388324:2 2261 [main] INFO backtype.storm.daemon.task – Shutting down task Getting-Started-Toplogie-1-1376388324:1 2262 [Thread-24] INFO backtype.storm.util – Async loop interrupted! 2276 [main] INFO backtype.storm.daemon.task – Shut down task Getting-Started-Toplogie-1-1376388324:1 2278 [main] INFO backtype.storm.daemon.worker – Terminating zmq context 2279 [main] INFO backtype.storm.daemon.worker – Disconnecting from storm cluster […]

Storm-Kafka多个鲸鱼喷水,如何分担负荷?

我试图在多个喷口之间分享任务。 我有一种情况,我从外部源一次得到一个元组/消息,我想要有多个spout实例,主要目的是分担负载并提高性能效率。 我可以用一个Spout本身做同样的事情,但我想分担多个喷口的负载。 我无法获得分散负载的逻辑。 由于消息的偏移在特定喷口完成消耗部件之前将不会被知道(即,基于设置的缓冲器大小)。 任何人都可以对如何计算逻辑/算法有所启发吗? 提前谢谢你的时间。 更新以回答答案: 现在在Kafka上使用多分区(即5 ) 以下是使用的代码: builder.setSpout(“spout”, new KafkaSpout(cfg), 5); 通过在每个分区上使用800 MB数据进行泛洪测试,完成读取需要~22 sec 。 再次,使用parallelism_hint = 1的代码 即builder.setSpout(“spout”, new KafkaSpout(cfg), 1); 现在花了更多~23 sec ! 为什么? 根据Storm Docs的 setSpout()声明如下: public SpoutDeclarer setSpout(java.lang.String id, IRichSpout spout, java.lang.Number parallelism_hint) 哪里, parallelism_hint – 是执行此spout应分配的任务数。 每个任务都将在群集周围某个进程中的某个线程上运行。

如何在Intellij IDEA中构建和运行Storm拓扑

我按照Storm Starter说明并在IntelliJ中导入了Twitter Storm。 为了测试我编辑了ExclaimationToplogy并使用了Maven命令来构建和运行它: mvn -f m2-pom.xml compile exec:java -Dstorm.topology=storm.starter.ExclamationTopology 但我更感兴趣的是在IDE中而不是从命令行构建和运行。 我需要采取什么行动? 谢谢

物理内存使用率太高

当我尝试使用storm使用本地模式运行拓扑时,我收到此错误 mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=my.Topology 错误是 ERROR backtype.storm.util – Async loop died! java.lang.OutOfMemoryError: Physical memory usage is too high: physicalBytes = 3G > maxPhysicalBytes = 3G 我该如何解决? 我不知道应该增加哪个物理内存! 如果我在生产模式下运行拓扑,这个错误会消失吗? UPDATE Physical Memory Array Location: System Board Or Motherboard Use: System Memory Error Correction Type: None Maximum Capacity: 32 GB Error Information Handle: 0x0019 Number […]

构建Storm时Zookeeper的ClassNotFoundException

我是java和Storm的新手所以请原谅任何明显的错误。 我正在尝试使用水槽连接器运行风暴,但它崩溃时出现以下错误: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ExceptionInInitializerError at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at clojure.lang.RT.loadClassForName(RT.java:2056) at clojure.lang.RT.load(RT.java:419) at clojure.lang.RT.load(RT.java:400) at clojure.core$load$fn__4890.invoke(core.clj:5415) at clojure.core$load.doInvoke(core.clj:5414) at clojure.lang.RestFn.invoke(RestFn.java:408) at clojure.core$load_one.invoke(core.clj:5227) at clojure.core$load_lib.doInvoke(core.clj:5264) at clojure.lang.RestFn.applyTo(RestFn.java:142) at clojure.core$apply.invoke(core.clj:603) at clojure.core$load_libs.doInvoke(core.clj:5302) at clojure.lang.RestFn.applyTo(RestFn.java:137) at clojure.core$apply.invoke(core.clj:603) at clojure.core$require.doInvoke(core.clj:5381) at clojure.lang.RestFn.invoke(RestFn.java:408) at […]

测试java HBase连接

我正在尝试使用HBase Java API将数据写入HBase。 我通过Ambari安装了Hadoop / HBase。 以下是当前配置的配置方式: final Configuration CONFIGURATION = HBaseConfiguration.create(); final HBaseAdmin HBASE_ADMIN; HBASE_ADMIN = new HBaseAdmin(CONFIGURATION) 当我尝试写入HBase时,我会检查以确保该表存在 !HBASE_ADMIN.tableExists(tableName) 如果没有,请创建一个新的。 但是,似乎在尝试检查表是否存在时会抛出exception。 我想知道我是否没有正确连接到HBase …是否有任何好方法来validation配置是否正确以及我是否连接到HBase? 我得到的例外情况如下。 谢谢。 java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:209) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:288) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140) at org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:135) at org.apache.hadoop.hbase.catalog.MetaReader.fullScan(MetaReader.java:597) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:802) at org.apache.hadoop.hbase.catalog.MetaReader.tableExists(MetaReader.java:359) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:287) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:301) at com.business.project.hbase.HBaseMessageWriter.getTable(HBaseMessageWriter.java:40) at com.business.project.hbase.HBaseMessageWriter.write(HBaseMessageWriter.java:59) at com.business.project.hbase.HBaseMessageWriter.write(HBaseMessageWriter.java:54) […]

Storm:用于从端口读取数据的Spout

我需要写一个风暴喷口来从端口读取数据。 想知道这在逻辑上是否可行。 考虑到这一点,我设计了一个简单的拓扑结构,设计用于一个喷嘴和一个螺栓。 spout会收集使用wget发送的HTTP请求,而bolt会显示请求 – 就是这样。 我的喷口结构如下: public class ProxySpout extends BaseRichSpout{ //The O/P collector SpoutOutputCollector sc; //The socket Socket clientSocket; //The server socket ServerSocket sc; public ProxySpout(int port){ this.sc=new ServerSocket(port); try{ clientSocket=sc.accept(); }catch(IOException ex){ //Handle it } } public void nextTuple(){ try{ InputStream ic=clientSocket.getInputStream(); byte b=new byte[8196]; int len=ic.read(b); sc.emit(new Values(b)); ic.close(); }catch(//){ //Handle […]