我的风暴拓扑既不工作(不生成输出)也不失败(不生成错误或exception)

我有一个拓扑,我试图计算由SimulatorSpout(不是真正的流)生成的单词出现,并在写入MySQL数据库表之后,表格方案非常简单:

Field | Type | ... ID | int(11) | Auto_icr word | varchar(50) | count | int(11) | 

但是我面临着奇怪的问题(正如我之前提到的)我成功地将The Topology提交给我的Storm Cluster,它由4个主管组成,我可以在Storm Web UI中看到拓扑的流程(没有例外),但是当我检查MySQL表时令我惊讶的是,桌子是空的……

任何评论,建议都是欢迎……

这是喷口和螺栓:

 public class MySQLConnection { private static Connection conn = null; private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?"; private static String dbClass = "com.mysql.jdbc.Driver"; public static Connection getConnection() throws SQLException, ClassNotFoundException { Class.forName(dbClass); conn = DriverManager.getConnection(dbUrl, "root", "qwe123"); return conn; } } 

============================= SentenceSpout ==================== ===========

 public class SentenceSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; private boolean _completed = false; private SpoutOutputCollector _collector; private String [] sentences = { "Obama delivered a powerfull speech against USA", "I like cold beverages", "RT http://www.turkeyairline.com Turkish Airlines has delayed some flights", "don't have a cow man...", "i don't think i like fleas" }; private int index = 0; public void open (Map config, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void nextTuple () { _collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; Utils.waitForSeconds(1); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void ack(Object msgId) { System.out.println("OK: " + msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL: " + msgId); } } 

============================ SplitSentenceBolt ===================== =========

 public class SplitSentenceBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector _collector; public void prepare (Map config, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute (Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*"; sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", ""); String[] words = sentence.split(" "); for (String word : words) { if (!word.isEmpty()) _collector.emit(new Values(word.trim())); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } 

=========================== WordCountBolt ====================== ===========

 public class WordCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private HashMap counts = null; private OutputCollector _collector; private ResultSet resSet = null; private Statement stmt = null; private Connection _conn = null; private String path = "/home/hduser/logOfStormTops/logger.txt"; String rLine = null; public void prepare (Map config, TopologyContext context, OutputCollector collector) { counts = new HashMap(); _collector = collector; } public void execute (Tuple tuple) { int insertResult = 0; int updateResult = 0; String word = tuple.getStringByField("word"); //---------------------------------------------------- if (!counts.containsKey(word)) { counts.put(word, 1); try { insertResult = wordInsertIfNoExist(word); if (insertResult == 1) { _collector.ack(tuple); } else { _collector.fail(tuple); } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } } else { //----------------------------------------------- counts.put(word, counts.get(word) + 1); try { // writing to db updateResult = updateCountOfExistingWord(word); if (updateResult == 1) { _collector.ack(tuple); } else { _collector.fail(tuple); } // Writing to file BufferedWriter buffer = new BufferedWriter(new FileWriter(path)); buffer.write("[ " + word + " : " + counts.get("word") + " ]"); buffer.newLine(); buffer.flush(); buffer.close(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } System.out.println("{word-" + word + " : count-" + counts.get(word) + "}"); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } // ***************************************************** public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException { String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\""; String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word + "\", 1)"; _conn = MySQLConnection.getConnection(); stmt = _conn.createStatement(); resSet = stmt.executeQuery(query); int res = 0; if (!resSet.next()) { res = stmt.executeUpdate(insert); } else { System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi"); } resSet.close(); stmt.close(); _conn.close(); return res; } public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException { String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\""; _conn = MySQLConnection.getConnection(); stmt = _conn.createStatement(); int result = stmt.executeUpdate(update); //System.out.println(word + "'s count has been updated (incremented)"); resSet.close(); stmt.close(); _conn.close(); return result; } } 

========================= WordCountTopology ======================== ======

 public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String TOPOLOGY_NAME = "NewWordCountTopology"; @SuppressWarnings("static-access") public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); Config config = new Config(); config.setMaxSpoutPending(100); config.setDebug(true); StormSubmitter submitter = new StormSubmitter(); submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); } } 

这是因为抛出exception时不会调用_collector.ack(tuple)。 当有太多待处理的元组时,spout将停止发送新的元组。 尝试抛出RuntimeException而不是printStackTrace。