如何使用IDE在Storm生产群集中提交拓扑

我正面临一个问题Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload在使用IDE向生产集群提交拓扑时Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload ,而如果我使用storm jar命令在命令行执行,则相同像天堂一样奔跑。 我从githublink看过相同的例子 。

对于提交拓扑,我使用这些行

  conf.put(Config.NIMBUS_HOST, NIMBUS_NODE); conf.put(Config.NIMBUS_THRIFT_PORT,6627); conf.put(Config.STORM_ZOOKEEPER_PORT,2181); conf.put(Config.STORM_ZOOKEEPER_SERVERS,ZOOKEEPER_ID); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubmitter submitter = new StormSubmitter(); submitter.submitTopology("test", conf, builder.createTopology()); 

如果这是正确的运行方法,请建议我?

很好找到解决方案。 当我们运行“暴风jar”时,它会在提交的jar中触发storm.jar的属性标志。 因此,如果我们想以编程方式提交jar,那么只需这样设置标志即可

System.setProperty("storm.jar", );

例如:

 System.setProperty("storm.jar", "/Users/programming/apache-storm-1.0.1/lib/storm-core-1.0.1.jar"); StormSubmitter.submitTopology("myTopology", config, builder.createTopology()); 

要将拓扑提交到远程Storm集群,您需要将该jar上传到nimbus机器,然后使用NimbusClient将该jar提交到Cluster。
你可以这样做:

 Map storm_conf = Utils.readStormConfig(); storm_conf.put("nimbus.host", ""); Client client = NimbusClient.getConfiguredClient(storm_conf) .getClient(); String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar"; NimbusClient nimbus = new NimbusClient(storm_conf, "", ); // upload topology jar to Cluster using StormSubmitter String uploadedJarLocation = StormSubmitter.submitJar(storm_conf, inputJar); String jsonConf = JSONValue.toJSONString(storm_conf); nimbus.getClient().submitTopology("testtopology", , jsonConf, builder.createTopology()); 

以下是工作示例: 向Remote Storm Cluster提交拓扑

我没有运行java代码提交自己,但我检查了storm命令 – 它是一个python文件,运行java和http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html类

我认为你唯一应该担心的是 – 在执行它时包括所有需要的库。

我已经基于@ abhi和@Nishu Tayal的答案解决了这个问题,我想在这里发布我的代码:

 public static void submitLocalTopologyWay1(String topologyName, Config topologyConf, StormTopology topology, String localJar) { try { //get default storm config Map defaultStormConf = Utils.readStormConfig(); defaultStormConf.putAll(topologyConf); //set JAR System.setProperty("storm.jar",localJar); //submit topology StormSubmitter.submitTopology(topologyName, defaultStormConf, topology); } catch (Exception e) { String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage(); System.out.println(errorMsg); e.printStackTrace(); } } public static void submitLocalTopologyWay2(String topologyName, Config topologyConf, StormTopology topology, String localJar) { try { //get nimbus client Map defaultStormConf = Utils.readStormConfig(); defaultStormConf.putAll(topologyConf); Client client = NimbusClient.getConfiguredClient(defaultStormConf).getClient(); //upload JAR String remoteJar = StormSubmitter.submitJar(defaultStormConf, localJar); //submit topology client.submitTopology(topologyName, remoteJar, JSONValue.toJSONString(topologyConf), topology); } catch (Exception e) { String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage(); System.out.println(errorMsg); e.printStackTrace(); } } 

那么这是一个测试,您必须首先将代码构建到JAR文件。

 public void testSubmitTopologySubmitLocalTopologyWay1() { Config config = new Config(); config.put(Config.NIMBUS_HOST,"9.119.84.179"); config.put(Config.NIMBUS_THRIFT_PORT, 6627); config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("9.119.84.177","9.119.84.178","9.119.84.176")); config.put(Config.STORM_ZOOKEEPER_PORT,2181); config.put(Config.TOPOLOGY_WORKERS, 3); RemoteSubmitter.submitLocalTopologyWay1("word-count-test-1", config, WordCountTopology.buildTopology(), // your topology "C:\\MyWorkspace\\project\\storm-sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar");//the JAR file }