博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm的数据源编程单元Spout学习整理
阅读量:6091 次
发布时间:2019-06-20

本文共 2901 字,大约阅读时间需要 9 分钟。

Spout呢,是Topology中数据流的源头,也是Storm针对数据源的编程单元。一般数据的来源,是通过外部数据源来读取数据项(Tuple),并读取的数据项传输至作业的其他组件。编程人员一般可通过OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送的流,然后使用SpoutOutputCollector的emit方法将数据发送。

这里整理了下ISpout和IComponent接口。

ISpout声明了Spout的核心方法,用于向Topology供给数据项。对于每一个发出的数据项,Storm通过Spout,可以追踪它经历处理过程的有向无环图(竟然也是DAG)。

void open (java,util.Map conf,TopologyContext context,SpoutOutputCollector collector)用于实例化Spout的一个运行时任务,被急群众的某一进程调用 (conf对象维护Storm中针对该Spout的配置信息,context是一个上下文对象,可用于获取该组件运行时任务的信息,collector用于从该Spout发送数据项)void close() 用于停止一个Spoutvoid activate()在Spout从非激活状态转换为激活状态时被调用void deactivate()在Spout的非激活状态被调用 void ack(java.lang.Object msgId) Storm用于确认该Spout发送的这个数据项已经被完整处理void fail(java.lang.Object msgId)Storm用于确认该Spout发送的这个数据项已经失败void nextTuple()当这个方法被调用时,Storm要求Spout发送一个数据项至output collector (nextTuple是Spout向Topology中发送一个数据项,是Spout需要实现的最重要的方法。在可靠的Spout的一个任务中,nextTuple()、ack()、fail()三个方法的调用在一个单独线程中循环。当不存在数据项需要发送时,nextTuple()将会休眠一小段间隔,确保不会浪费过多的CPU资源)

 

IComponent接口,声明了Topology组件的通用方法。使用JAVA语言的Spout和Bolt都必须实现这个接口。

void declareOutputFields(OutputFieldsDeclarer declarer)声明指定输出流的数据项结构。(这里指定了输出流的数据项结构(schema)。参数declarer被用来声明输出流(stream)的id,域。java.util.Map  getComponentConfiguration()获取组件的配置信息

 

以Storm官网的WordCount来说明就是:

public class WordCount extends BaseRichSpout{    public static Logger log = logger.getLogger(backtype/storm/testing/WordCount);    boolean_isDistributed;    SpoutOutputCollector_collector;        public WordCount(){        this(true);    }        public WordCount(boolean isDistributed){        _isDistributed = isDistributed;    }    public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){    _collector = collector;    }    public void close(){    }    public void nextTuple(){       Utils.sleep(100L);        String words[] = {        "nathan","mike","jackson","golda","bertels"        };    public void ack(Object obj){    }    public void fail(Object obj){    }    public void declareOutputFields(OutputFieldsDeclarer declarer){        declarer.declarer(new Fields(new String[] {            "word"    }));    }    public Map getComponentConfiguration(){        if(!_isDistributed_)        {            Map ret = new HashMap();            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,Integer.valueOf(1));}else{        return null;        }    }}

1、类中有对WordCount的两个重载的构造函数,其中_isDistributed指明了Spout的并行度,若_isDistributed=false,则意味着这个Spout运行时仅有一份任务实例。

2、open()函数的实现,将传入的collector赋值给局部变量,使之后通过该局部变量来操作数据项的发送。

3、declareOutputFields()函数,生命了输出流的数据项结构。

4、nextTuple函数,让一只执行的线程休眠100毫秒,再继续执行下述函数体,通过线程的休眠,控制nextTuple()产生数据项的周期为0.1秒。并且在维护字符串数组中,随机挑选一个字符串,作为"word"的域,交给变量collector作为一个Tuple发送。 (ack的作用是确认数据项是否被完整处理,这里没做处理)

5、getComponentConfiguration()函数则返回组建的配置信息(这个实例中只有在_isDistributed=false时,才返回包含该配置项的Map数据结构。

6、其他重载函数都为空实现。

那么在Topology实现类的main函数使其作为一个spout:

TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("sentenceGenSpout",new WordCount());

 

转载地址:http://smmwa.baihongyu.com/

你可能感兴趣的文章
DisparityCostVolumeEstimator.cpp
查看>>
(转)git中关于fetch的使用
查看>>
mongo DB for C#
查看>>
caffe整体框架的学习的博客,这个博客山寨了一个caffe框架
查看>>
git只拉取github部分代码的方法
查看>>
[LeetCode] Construct Quad Tree 建立四叉树
查看>>
如何避免SHRINKDATABASE & SHRINKFILE 产生索引碎片(转载)
查看>>
【SSH网上商城项目实战02】基本增删查改、Service和Action的抽取以及使用注解替换xml...
查看>>
高阶函数简述 js
查看>>
Java CompletableFuture:allOf等待所有异步线程任务结束
查看>>
Highmaps网页图表教程之图表配置项结构与商业授权
查看>>
mysql 5.6.33发布
查看>>
java 获取URL链接 内容
查看>>
Linux 命令详解(二)awk 命令
查看>>
Android动态载入Dex机制解析
查看>>
PostgreSQL数据库中的常见错误
查看>>
jquery 控制 video 视频播放和暂停
查看>>
XCode调试多线程遭遇海森伯效应一例
查看>>
ie6下浮动使绝对定位元素莫名消失的问题
查看>>
FBReaderJ 1.6.3 发布,Android 电子书阅读器
查看>>