如何在风暴中创建拓扑

我们是暴风雨的新手。 我们不知道如何创建拓扑结构,请帮助我们解决风暴问题。 我们尝试了“在Windows上运行风暴”一文中给出的示例wordcount c =拓扑。 但我们无法理解如何给出输入以及输入存在的位置以及风暴ui中输出的位置。

Storm UI中不存在输入和输出。在Storm UI中,您可以看到没有发出的元组,处理时间,群集配置和群集的运行状况。要查看输出和输入,请使用记录器机制,然后检查每个工作日志文件风暴包的日志文件夹。 要在Storm中创建拓扑,你需要两个东西,一个是喷口,一个是螺栓。请在下面找到样本代码: –

SampleSpout.java

import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; public class SampleSpout implements IRichSpout{ SpoutOutputCollector collector; int i=0; List tupleList; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void nextTuple() { tupleList=new ArrayList(); tupleList.add("storm"+i); tupleList.add(i); collector.emit(tupleList,i); i++; } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } } 

SampleBolt.java

 import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class SampleBolt implements IBasicBolt { private static Logger log = LoggerFactory.getLogger(SampleBolt.class); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { log.info(input.getValues().toString()+"output values"); } @Override public void cleanup() { // TODO Auto-generated method stub } } 

SampleTopology.java

 import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class SampleTopology { /** * @param args */ public static void main(String[] args) { TopologyBuilder topology=new TopologyBuilder(); topology.setSpout("sampleSpout",new SampleSpout()); topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout"); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster=new LocalCluster(); cluster.submitTopology("test", conf, topology.createTopology()); } }