使用storm实现实时大数据分析

上传人:第*** 文档编号:32821119 上传时间:2018-02-12 格式:DOC 页数:17 大小:241KB
返回 下载 相关 举报
使用storm实现实时大数据分析_第1页
第1页 / 共17页
使用storm实现实时大数据分析_第2页
第2页 / 共17页
使用storm实现实时大数据分析_第3页
第3页 / 共17页
使用storm实现实时大数据分析_第4页
第4页 / 共17页
使用storm实现实时大数据分析_第5页
第5页 / 共17页
点击查看更多>>
资源描述

《使用storm实现实时大数据分析》由会员分享,可在线阅读,更多相关《使用storm实现实时大数据分析(17页珍藏版)》请在金锄头文库上搜索。

1、使用 Storm 实现实时大数据分析当今世界,公司的日常运营经常会生成 TB 级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是 Storm Twitter 开发,通常被比作“实时的 Hadoop”。然而 Storm 远比 Hadoop 来的简单,因为用它处理大数据不会带来新老技术的交替。Shruthi Kumar、Siddharth Patankar 共同效力于 Infosys,分别从事技术分析和研发工作。本文详述了

2、 Storm 的使用方法,例子中的项目名称为 “超速报警系统( Speeding Alert System) ”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 便触发一个 trigger 并把相关的数据存入数据库。Storm对比 Hadoop 的批处理,Storm 是个实时的、分布式以及具备高容错的计算系统。同Hadoop 一样 Storm 也可以处理大批量的数据,然而 Storm 在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm 同样还具备容错和分布计算这些特性,这就让 Storm 可以扩展到不同的机器上进行大批量的数据

3、处理。他同样还有以下的这些特性:易于扩展。对于扩展,你只需要添加机器和改变对应的 topology(拓扑)设置。Storm 使用Hadoop Zookeeper 进行集群协调,这样可以充分的保证大型集群的良好运行。每条信息的处理都可以得到保证。Storm 集群管理简易。Storm 的容错机能:一旦 topology 递交,Storm 会一直运行它直到 topology 被废除或者被关闭。而在执行中出现错误时,也会由 Storm 重新分配任务。尽管通常使用 Java,Storm 中的 topology 可以用任何语言设计。当然为了更好的理解文章,你首先需要安装和设置 Storm。需要通过以下几个

4、简单的步骤:从 Storm 官方下载 Storm 安装文件将 bin/directory 解压到你的 PATH 上,并保证 bin/storm 脚本是可执行的。Storm 组件Storm 集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper 进行协调。主节点:主节点通常运行一个后台程序 Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于 Hadoop 中的 Job Tracker。工作节点:工作节点同样会运行一个后台程序 Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是 topology 中一个子集的实

5、现。而 Nimbus 和 Supervisor 之间的协调则通过 Zookeeper 系统或者集群。ZookeeperZookeeper 是完成 Supervisor 和 Nimbus 之间协调的服务。而应用程序实现实时的逻辑则被封装进 Storm 中的 “topology”。topology 则是一组由 Spouts(数据源)和 Bolts(数据操作)通过 Stream Groupings 进行连接的图。下面对出现的术语进行更深刻的解析。Spout:简而言之,Spout 从来源处读取数据并放入 topology。Spout 分成可靠和不可靠两种;当Storm 接收失败时,可靠的 Spout

6、会对 tuple(元组,数据项组成的列表)进行重发;而不可靠的 Spout 不会考虑接收成功与否只发射一次。而 Spout 中最主要的方法就是nextTuple() ,该方法会发射一个新的 tuple 到 topology,如果没有新 tuple 发射则会简单的返回。Bolt:Topology 中所有的处理都由 Bolt 完成。Bolt 可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt 从 Spout 中接收数据并进行处理,如果遇到复杂流的处理也可能将 tuple 发送给另一个 Bolt 进行处理。而 Bolt 中最重要的方法是 execute() ,以新的tuple

7、作为参数接收。不管是 Spout 还是 Bolt,如果将 tuple 发射成多个流,这些流都可以通过 declareStream()来声明。Stream Groupings:Stream Grouping 定义了一个流在 Bolt 任务间该如何被切分。这里有 Storm 提供的 6 个Stream Grouping 类型:1. 随机分组(Shuffle grouping):随机分发 tuple 到 Bolt 的任务,保证每个任务获得相等数量的 tuple。2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据 “user-id”字段,相同“user-id”

8、的元组总是分发到同一个任务,不同“user-id ”的元组可能分发到不同的任务。3. 全部分组(All grouping):tuple 被复制到 bolt 的所有任务。这种类型需要谨慎使用。4. 全局分组(Global grouping):全部流都分配到 bolt 的同一个任务。明确地说,是分配给 ID 最小的那个 task。5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm 将把无分组的 Bolts 放到 Bolts 或 Spouts 订阅它们的同一线程去执行(如果可能) 。6. 直接分组(Direct grouping):这

9、是一个特别的分组类型。元组生产者决定 tuple 由哪个元组处理者任务接收。当然还可以实现 CustomStreamGroupimg 接口来定制自己需要的分组。项目实施当下情况我们需要给 Spout 和 Bolt 设计一种能够处理大量数据(日志文件)的 topology,当一个特定数据值超过预设的临界值时促发警报。使用 Storm 的 topology,逐行读入日志文件并且监视输入数据。在 Storm 组件方面,Spout 负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改 Spout 会读入新的版本并且覆盖之前的 tuple(可以被 Bolt 读入的格式) ,

10、将 tuple 发射给 Bolt 进行临界分析,这样就可以发现所有可能超临界的记录。下一节将对用例进行详细介绍。临界分析这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold) 。瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个 trigger。举个例子当车辆超越 80 公里每小时,则触发 trigger。时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在 5 分钟类,时速超过 80KM 两次及以上的车辆。List

11、ing One 显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。AB 123 60 North cityBC 123 70 South cityCD 234 40 South cityDE 123 40 East cityEF 123 90 South cityGH 123 50 West city这里将创建一个对应的 XML 文件,这将包含引入数据的模式。这个 XML 将用于日志文件的解析。XML 的设计模式和对应的说明请见下表。XML 文件和日志文件都存放在 Spout 可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的 top

12、ology 请见下图。Figure 1:Storm 中建立的 topology,用以实现数据实时处理如图所示:FilelistenerSpout 接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt 进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给 DBWriterBolt,然后由 DBWriterBolt 存入给数据库。下面将对这个过程的实现进行详细的解析。Spout 的实现Spout 以日志文件和 XML 描述文件作为接收对象。XML 文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕

13、获位置。 (看下图)Figure2:数据从日志文件到 Spout 的流程图Listing Two 显示了 tuple 对应的 XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML 文件以及数据都被保存到 Spout 指定的路径。Listing Two:用以描述日志文件的 XML 文件。vehicle_numberstringspeedintlocation string,通过构造函数及它的参数 Directory、PathSpout 和 TupleInfo 对象创建 Spout 对象。TupleInfo 储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通

14、过XSTream 序列化 XML 时建立。Spout 的实现步骤:对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。在数据得到了字段的说明后,将其转换成 tuple。声明 Spout 和 Bolt 之间的分组,并决定 tuple 发送给 Bolt 的途径。Spout 的具体编码在 Listing Three 中显示。Listing Three:Spout 中 open、nextTuple 和 delcareOutputFields 方法的逻辑。public void open( Map conf, TopologyContext context,SpoutOutputCollecto

15、r collector ) _collector = collector; try fileReader = new BufferedReader(new FileReader(new File(file); catch (FileNotFoundException e) System.exit(1); public void nextTuple() protected void ListenFile(File file) Utils.sleep(2000); RandomAccessFile access = null; String line = null; try while (line = access.readLine() != null) if (line !=null) String fields=null; if (tupleInfo.getDelimiter().equals(|) fields = line.split(+tupleInfo.getDelimiter(); else fields = line.split (tupleInfo.getDelimiter(); if (tupleInfo.getFieldList().size() = fields.length) _collector.emit(new Values(fields);

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 建筑/环境 > 工程造价

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号