Apache Storm远程拓扑提交

我一直在使用IDE(Eclipse)测试Storm Topologies的远程提交。 我成功地将简单的风暴拓扑上传到远程Storm集群,但奇怪的是当我检查Storm UI以确定远程提交的拓扑是否正常工作时,我看到UI中只有_acker bolt而其他螺栓和鲸鱼喷水不存在。 之后,我从命令行手动提交拓扑,并再次检查Storm UI,它正在工作,因为它应该没有问题。 我一直在找原因但找不到。 我在下面附加了拓扑和远程提交者类以及相应的Storm UI图片:

这是Eclipse控制台的输出(远程提交后)

225 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 234 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 

这是拓扑:

 public class StormBenchmark { // ****************************************************************************************** public static class GenSpout extends BaseRichSpout { //private static final Logger logger = Logger.getLogger(StormBenchmark.class.getName()); private Long count = 1L; private Object msgID; private static final long serialVersionUID = 1L; private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}; private static final String[] newsagencies = {"bbc", "cnn", "reuters", "aljazeera", "nytimes", "nbc news", "fox news", "interfax"}; SpoutOutputCollector _collector; int _size; Random _rand; String _id; String _val; // Constructor public GenSpout(int size) { _size = size; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); _id = randString(5); _val = randString2(_size); } //Business logic public void nextTuple() { count++; msgID = count; _collector.emit(new Values(_id, _val), msgID); } public void ack(Object msgID) { this.msgID = msgID; } private String randString(int size) { StringBuffer buf = new StringBuffer(); for(int i=0; i<size; i++) { buf.append(CHARS[_rand.nextInt(CHARS.length)]); } return buf.toString(); } private String randString2(int size) { StringBuffer buf = new StringBuffer(); for(int i=0; i<size; i++) { buf.append(newsagencies[_rand.nextInt(newsagencies.length)]); } return buf.toString(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "item")); } } // ======================================================================================================= // =================================== BOLT =========================================================== public static class IdentityBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "item")); } public void execute(Tuple tuple, BasicOutputCollector collector) { String character = tuple.getString(0); String agency = tuple.getString(1); List box = new ArrayList(); box.add(character); box.add(agency); try { fileWriter(box); } catch (IOException e) { e.printStackTrace(); } box.clear(); } public void fileWriter(List listjon) throws IOException { String pathname = "/home/hduser/logOfStormTops/logs.txt"; File file = new File(pathname); if (!file.exists()){ file.createNewFile(); } BufferedWriter writer = new BufferedWriter(new FileWriter(file, true)); writer.write(listjon.get(0) + " : " + listjon.get(1)); writer.newLine(); writer.flush(); writer.close(); } } //storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new GenSpout(8), 2).setNumTasks(4); builder.setBolt("bolt", new IdentityBolt(), 4).setNumTasks(8) .shuffleGrouping("spout"); Config conf = new Config(); conf.setMaxSpoutPending(200); conf.setStatsSampleRate(0.0001); //topology.executor.receive.buffer.size: 8192 #batched //topology.executor.send.buffer.size: 8192 #individual messages //topology.transfer.buffer.size: 1024 # batched conf.put("topology.executor.send.buffer.size", 1024); conf.put("topology.transfer.buffer.size", 8); conf.put("topology.receiver.buffer.size", 8); conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n"); StormSubmitter.submitTopology("SampleTop", conf, builder.createTopology()); } } 

这是RemoteSubmitter类:

 public class RemoteSubmissionTopo { @SuppressWarnings({ "unchecked", "rawtypes", "unused" }) public static void main(String... args) { Config conf = new Config(); TopologyBuilder topoBuilder = new TopologyBuilder(); conf.put(Config.NIMBUS_HOST, "117.16.142.49"); conf.setDebug(true); Map stormConf = Utils.readStormConfig(); stormConf.put("nimbus.host", "117.16.142.49"); String jar_path = "T:\\STORM_TOPOLOGIES\\Benchmark.jar"; Client client = NimbusClient.getConfiguredClient(stormConf).getClient(); try { NimbusClient nimbus = new NimbusClient(stormConf, "117.16.142.49", 6627); String uploadedJarLocation = StormSubmitter.submitJar(stormConf, jar_path); String jsonConf = JSONValue.toJSONString(stormConf); nimbus.getClient().submitTopology("benchmark-tp", uploadedJarLocation, jsonConf, topoBuilder.createTopology()); } catch (TTransportException e) { e.printStackTrace(); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { Thread.sleep(6000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 

这是Storm UI pict(如果是远程提交)

在此处输入图像描述

这是另一个Storm UI pict(在手动提交的情况下)

在此处输入图像描述

RemoteSubmissionTopo你使用TopologyBuilder topoBuilder = new TopologyBuilder(); 但是不要调用setSpout(...) / setBolt(...) 。 因此,您提交的拓扑没有运算符……

顺便说一下:实际上根本不需要RemoteSubmissionTopo 。 您可以使用StormBenchmark远程提交。 只需添加conf.put(Config.NIMBUS_HOST, "117.16.142.49");main和set JVM选项-Dstorm.jar=/path/to/topology.jar你很好运行。