博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm入门(2)--Storm编程
阅读量:4558 次
发布时间:2019-06-08

本文共 9256 字,大约阅读时间需要 30 分钟。

以电信通话记录为例

移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。

 编程思想:

在storm中,把对数据的处理过程抽象成一个topology,这个topology包含的组件主要是spout、bolt,以及以tuple形式在组件之间传输的数据流。这个数据流在topology流一遍,就是对数据的一次处理。

1、创建Spout类

这一部分,是创建数据流的源头。

创建一个类,实现IRichSpout接口,实现相应方法。其中几个方法的含义:

  • open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。一般写一些第一次运行时要处理的逻辑
  • nextTuple -通过收集器发出生成的数据。核心,用于生成数据流
  • close -当spout将要关闭时调用此方法。
  • declareOutputFields -声明元组的输出模式。即,声明了从此spout出去的流都的数据格式
  • ack -确认处理了特定元组。
  • fail -指定不处理和不重新处理特定元组。
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 为此spout提供storm配置。
  • context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
  • collector - 使我们能够发出将由bolts处理的元组。
nextTuple()

nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用于声明输出流id,输出字段等,此方法用于指定元组的输出模式。

ack(Object msgId)

该方法确认已经处理了特定元组。

fail(Object o)

此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组

package com.jing.calllogdemo;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichSpout;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Random;/*spout类,负责产生数据流 */public class CallLogSpout implements IRichSpout {    //spout 输出收集器    private SpoutOutputCollector collector;    //是否完成    private boolean completed = false;    //上下文对象    private TopologyContext context;    //随机发生器    private Random randomGenerator = new Random();    //索引    private Integer idx = 0;    @Override    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {        //第一次运行要做的事        this.context = topologyContext;        this.collector = spoutOutputCollector;    }    @Override    public void close() {    }    @Override    public void activate() {    }    @Override    public void deactivate() {    }    @Override    public void nextTuple() {        //产生第一条数据,        if (this.idx <= 1000){            List
mobileNumbers = new ArrayList
(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while (localIdx++ < 100 && this.idx++ <1000){ //取出主叫 String caller = mobileNumbers.get(randomGenerator.nextInt(4)); //取出被叫 String callee = mobileNumbers.get(randomGenerator.nextInt(4)); while (caller == callee){ //重新取出被叫 callee = mobileNumbers.get(randomGenerator.nextInt(4)); } //模拟通话时长 Integer duration = randomGenerator.nextInt(60); //输出元祖 this.collector.emit(new Values(caller,callee,duration)); } } } @Override public void ack(Object o) { } @Override public void fail(Object o) { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //声明输出字段,定义元组的结构,定义输出字段名称 outputFieldsDeclarer.declare(new Fields("from", "to", "duration")); } @Override public Map
getComponentConfiguration() { return null; }}
CallLogSpout

2、创建Bolt类

这一部分是完成对数据流的处理,Bolt把元组作为输入,对元组进行处理后,产生新的元组。

创建一个类,实现IRichBolt接口,实现相应方法。

  • prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
  • execute -处理单个元组的输入
  • cleanup -当spout要关闭时调用。
  • declareOutputFields -声明元组的输出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -为此bolt提供Storm配置。
  • context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
  • collector -使我们能够发出处理的元组。
execute(Tuple tuple)

这是bolt的核心方法,这里的元组是要处理的输入元组。execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。

cleanup()
declareOutputFields(OutputFieldsDeclarer declarer)

这个方法用于指定元组的输出模式,参数declarer用于声明输出流id,输出字段等。

这里有两个bolt

呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”

package com.jing.calllogdemo;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import java.util.Map;/*创建calllog日志的bolt */public class CallLogCreatorBolt implements IRichBolt {    private OutputCollector collector;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;    }    @Override    public void execute(Tuple tuple) {        //处理新的同话记录        String from = tuple.getString(0);        String to = tuple.getString(1);        Integer duration = tuple.getInteger(2);        //产生新的tuple        String fromTO = from + "-" + to;        collector.emit(new Values(fromTO, duration));    }    @Override    public void cleanup() {    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        //设置输出字段的名称        outputFieldsDeclarer.declare(new Fields("call", "duration"));    }    @Override    public Map
getComponentConfiguration() { return null; }}
CallLogCreatorBolt

 呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。

package com.jing.calllogdemo;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import java.util.HashMap;import java.util.Map;/*通话记录计数器bolt */public class CallLogCounterBolt implements IRichBolt {    Map
counterMap; private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.counterMap = new HashMap
(); this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else { Integer c = counterMap.get(call) + duration; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry
entry : counterMap.entrySet()){ System.out.println(entry.getKey() + " : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("call")); } @Override public Map
getComponentConfiguration() { return null; }}
CallLogCounterBolt

 3、创建执行入口类,构建Topology

Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout
(setSpout)和设置bolt
(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -
package com.jing.calllogdemo;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Fields;public class App {    public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {        TopologyBuilder builder = new TopologyBuilder();        //设置spout        builder.setSpout("spout", new CallLogSpout());        //设置creator-bolt        builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");        //设置countor-bolt        builder.setBolt("counter-bolt", new CallLogCounterBolt()).                fieldsGrouping("creator-bolt", new Fields("call"));        Config config = new Config();        config.setDebug(true);        /*本地模式        LocalCluster cluster = new LocalCluster();        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());        Thread.sleep(10000);        cluster.shutdown();         */        StormSubmitter.submitTopology("myTop", config, builder.createTopology());    }}
App

 

 

为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -
 
 
参考:
作者:raincoffee
链接:https://www.jianshu.com/p/7af9693d9ffc
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

 

生产环境的集群上运行topology

1)修改提交方式,在代码中

2)导出jar包 mvn

3)在linux上运行topologys

&>storm jar XXX.jar  full.class.name

 

转载于:https://www.cnblogs.com/Jing-Wang/p/11028749.html

你可能感兴趣的文章
分布式监控解决方案zabbix03-监控mysql
查看>>
[转]DB2错误代码大全
查看>>
[转]AJAX 简介
查看>>
【转】SWT/JFace的对话框
查看>>
2019-2-28作业
查看>>
VS2010+WDK开发环境搭建最简易方法
查看>>
果然逆天,处理一亿条int32数据排序,需要耗时32秒
查看>>
解锁Dagger2使用姿势(二) 之带你理解@Scope
查看>>
设计模式——抽象工厂模式详解
查看>>
十年,青春就是一转眼的事
查看>>
C++编程规范和各种资源
查看>>
两只小熊队高级软件工程第七次作业敏捷冲刺2
查看>>
[原创]如何编写多个阻塞队列连接下的多生产者多消费者的Python程序
查看>>
如何提高数据迁移和复制的速度
查看>>
.9图片
查看>>
android打开各种文件Intent
查看>>
[转][C#]单例模式之懒加载
查看>>
实验吧之【后台登录,加了料的报错注入,认真一点】(报错注入)
查看>>
MySQL无法存储Emoji表情问题
查看>>
HDFS集中式的缓存管理原理与代码剖析
查看>>