如何使用apache风暴元组
我刚开始使用Apache Storm。 我阅读了教程并查看了示例我的问题是所有示例都使用非常简单的元组(通常一个字符串存档)。 元组是内联创建的(使用新值(…))。 在我的情况下,我有许多字段的元组(5..100)。 所以我的问题是如何为每个字段实现具有名称和类型(所有原始)的元组?
有什么例子吗? (我认为直接实施“元组”不是一个好主意)
谢谢
创建一个以所有字段作为值的元组的替代方法是创建一个bean并将其传递给元组内部。
鉴于以下课程:
public class DataBean implements Serializable { private static final long serialVersionUID = 1L; // add more properties as necessary int id; String word; public DataBean(int id, String word) { setId(id); setWord(word); } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } }
在一个bolt中创建和发出DataBean:
collector.emit(new Values(bean));
获取目标bolt中的DataBean:
@Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { DataBean bean = (DataBean)tuple.getValue(0); // do your bolt processing with the bean } catch (Exception e) { LOG.error("WordCountBolt error", e); collector.reportError(e); } }
在设置拓扑时,不要忘记使bean可序列化并注册:
Config stormConfig = new Config(); stormConfig.registerSerialization(DataBean.class); // more stuff StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());
免责声明:Beans可以很好地进行随机分组。 如果需要执行fieldsGrouping
,则仍应使用基元。 例如,在字数统计方案中,您需要逐个单词去,以便您可以发出:
collector.emit(new Values(word, bean));
我将按如下方式实现自定义元组/值类型:不是使用成员变量来存储数据,而是将每个属性映射到inheritance的Values
类型的对象列表中的固定索引。 这种方法避免了常规Bean的“字段分组”问题。
- 它不需要为字段分组添加额外的属性(非常不自然)
- 避免数据重复(减少发送的字节数)
- 它保留了豆类模式的优势
单词计数示例的示例如下:
public class WordCountTuple extends Values { private final static long serialVersionUID = -4386109322233754497L; // attribute indexes /** The index of the word attribute. */ public final static int WRD_IDX = 0; /** The index of the count attribute. */ public final static int CNT_IDX = 1; // attribute names /** The name of the word attribute. */ public final static String WRD_ATT = "word"; /** The name of the count attribute. */ public final static String CNT_ATT = "count"; // required for serialization public WordCountTuple() {} public WordCountTuple(String word, int count) { super.add(WRD_IDX, word); super.add(CNT_IDX, count); } public String getWord() { return (String)super.get(WRD_IDX); } public void setWort(String word) { super.set(WRD_IDX, word); } public int getCount() { return (Integer)super.get(CNT_IDX); } public void setCount(int count) { super.set(CNT_IDX, count); } public static Fields getSchema() { return new Fields(WRD_ATT, CNT_ATT); } }
为避免不一致,使用“word”和“count”属性的final static
变量。 此外,方法getSchema()
返回用于在Spout / Bolt方法中声明输出流的实现模式.declareOutputFields(...)
对于输出元组,此类型可以直接使用:
public MyOutBolt implements IRichBolt { @Override public void execute(Tuple tuple) { // some more processing String word = ... int cnt = ... collector.emit(new WordCountTuple(word, cnt)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(WordCountTuple.getSchema()); } // other methods omitted }
对于输入元组,我建议使用以下模式:
public MyInBolt implements IRichBolt { // use a single instance for avoid GC trashing private final WordCountTuple input = new WordCountTuple(); @Override public void execute(Tuple tuple) { this.input.clear(); this.input.addAll(tuple.getValues()); String word = input.getWord(); int count = input.getCount(); // do further processing } // other methods omitted }
MyOutBolt
和MyInBolt
可以连接如下:
TopologyBuilder b = ... b.setBolt("out", new MyOutBolt()); b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);
使用字段分组是直截了当的,因为WordCountTuple
允许单独访问每个属性。