spark分享分析共48页课件

上传人:re****.1 文档编号:569529569 上传时间:2024-07-30 格式:PPT 页数:48 大小:3.92MB
返回 下载 相关 举报
spark分享分析共48页课件_第1页
第1页 / 共48页
spark分享分析共48页课件_第2页
第2页 / 共48页
spark分享分析共48页课件_第3页
第3页 / 共48页
spark分享分析共48页课件_第4页
第4页 / 共48页
spark分享分析共48页课件_第5页
第5页 / 共48页
点击查看更多>>
资源描述

《spark分享分析共48页课件》由会员分享,可在线阅读,更多相关《spark分享分析共48页课件(48页珍藏版)》请在金锄头文库上搜索。

1、Spark分享目录Spark简介Spark批处理Spark集群模式SparkSQLSpark StreamingSpark简介Spark是什么Spark特点Spark生态系统Spark与Hadoop的区别PPT模板下载:1ppt/moban/Spark是什么官网介绍:官网介绍:Apache Sparkisafastandgeneralengineforlarge-scaledataprocessing.Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架,分布式资源管理工作交由集群管理工具(Mesos、YARN)PPT模板下载:1ppt/moban/Spark特点1.先进架构:

2、Spark采用Scala语言编写,底层采用actormode的akka作为通讯架构,代码十分简洁高效。基于DAG图的执行引擎,减少多次计算之间中间结果写到HDFS的开销。建立在统一抽象的RDD(分布式内存抽象)之上,使得它可以以基本一致的方式应对不同的大数据处理场景。2.运行速度快:提供Cache机制来支持需要反复迭代的计算,减少数据读取的IO开销3.易用性好:Spark提供广泛的数据集操作类型(各种转换算子,行动算子等)Spark支持Java,Python和ScalaAPI,支持交互式Python和Scala的shell4.通用性强:以其RDD模型的强大表现能力,逐渐形成了一套自己的生态圈,

3、提供了full-stack的解决方案。主要包括Spark内存中批处理,SparkSQL交互式查询,SparkStreaming流式计算,Mllib机器学习算法,GraphX图计算。5.与Hadoop无缝衔接:Spark可以使用YARN作为它的集群管理器读取HDFS,HBASE的Hadoop的数据PPT模板下载:1ppt/moban/Spark生态圈也称为BDAS(伯克利数据分析栈),是伯克利APMLab实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间通过大规模集成来展现大数据应用的一个平台。Spark生态圈以SparkCore为核心,从HDFS

4、、Cassandra、AmazonS3和HBase等持久层读取数据,以MESS、YARN和自身携带的Standalone为资源管理器调度Job完成Spark应用程序的计算。这些应用程序可以来自于不同的组件,如SparkShell/SparkSubmit的批处理、SparkStreaming的实时流处理应用、SparkSQL的即席查询、MLlib的机器学习、GraphX的图处理和SparkR的数学计算等等。Spark生态系统PPT模板下载:1ppt/moban/Spark与Hadoop的区别HadoopSpark抽象层次低,需要手工编写代码来完成,使用上难以上手基于RDD的抽象,使数据处理逻辑的

5、代码非常简短只提供两个操作,Map和Reduce,表达力欠缺提供很多转换和动作,很多基本操作如Join,GroupBy已经在RDD转换和动作中实现中间结果也放在HDFS文件系统中中间结果放在内存中,内存放不下了会写入本地磁盘ReduceTask需要等待所有MapTask都完成后才可以开始分区相同的转换构成流水线放在一个Task中运行,分区不同的转换需要Shuffle,被划分到不同的Stage中,需要等待前面的Stage完成后才可以开始时延高,只适用Batch数据处理,对于交互式数据处理,实时数据处理的支持不够通过将流拆成小的batch提供DiscretizedStream处理流数据对于迭代式数

6、据处理性能比较差通过在内存中缓存数据,提高迭代式计算的性能历经10年发展,已在生产环境稳定运行多年运行不够稳定PPT模板下载:1ppt/moban/Spark与Hadoop的区别Hadoop数据抽取运算模型:反复读写,磁盘IO是瓶颈Spark数据抽取运算模型:Spark批处理RDD简介Spark程序入口创建RDDRDD操作TransformationsActionsPPT模板下载:1ppt/moban/RDD(ResilientDistributedDataset):弹性分布式数据集-分布式内存抽象的概念RDD是Spark对数据的核心抽象,是Spark的基石。RDD是一个可容错、只读的、已被分

7、区的、可并行操作的分布式元素集合RDD的特点:1.只读:状态不可变,不能修改2.分区:支持元素根据Key来分区(Partitioning),保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统3.RDD必须是可序列化的4.路径:在RDD中叫血统(lineage),即RDD有充足的信息关于它是如何从其他RDD产生而来的5.持久化:可以控制存储级别(内存、磁盘等)来进行持久化在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示,即数据分区的集合,能根据本地性快速访问到数据的偏好位置,依赖关系,计算方法,是否是哈希/范围分区的元数据:RDD简介操作操作含义含义partiti

8、ons()返回一组Partition对象preferredLocations(p)根据数据存放的位置,返回分区p在哪些节点访问更快dependencies()返回一组依赖iterator(p,parentIters)按照父分区的迭代器,逐个计算分区p的元素(计算函数)partitioner()返回RDD是否hash/range分区的元数据信息PPT模板下载:1ppt/moban/SparkContext:Spark应用程序需要做的第一件事就是创建一个SparkContext对象,SparkContext对象决定了Spark如何访问集群。而要新建一个SparkContext对象,你还得需要构造一

9、个SparkConf对象,SparkConf对象包含了你的应用程序的配置信息。每个JVM进程中,只能有一个活跃(active)的SparkContext对象。如果你非要再新建一个,那首先必须将之前那个活跃的SparkContext对象stop()掉。Spark中已经有一个创建好的SparkContext,简称scScala:importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfvalconf=newSparkConf().setAppName(appName).setMaster(master)valsc=newS

10、parkContext(conf)Python:frompysparkimportSparkConf,SparkContextconf=SparkConf().setAppName(appName).setMaster(master)sc=SparkContext(conf=conf)Spark程序入口PPT模板下载:1ppt/moban/一、并行化集合一、并行化集合并行化集合是以一个已有的集合对象(例如:ScalaSeq)为参数,调用SparkContext.parallelize()方法创建得到的RDD。集合对象中所有的元素都将被复制到一个可并行操作的分布式数据集中。Scala:Pytho

11、n:valdata=Array(1,2,3,4,5)data=1,2,3,4,5valdistData=sc.parallelize(data)distData=sc.parallelize(data)二、外部数据集二、外部数据集Spark可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、AmazonS3等。Spark支持的文件格式包括:文本文件(textfiles)、SequenceFiles,以及其他Hadoop支持的输入格式(InputFormat)。Scala:valdistFile=sc.textFile(data

12、.txt)Python:distFile=sc.textFile(data.txt)三、通过转换现有的三、通过转换现有的RDD得到得到四、改变现有四、改变现有RDD的持久性的持久性(cache、save)创建RDDPPT模板下载:1ppt/moban/日志挖掘:vallines=sc.textFile(“hdfs:/.”)valerrors=lines.filter(_.startsWith(“ERROR”)errors.cache()errors.filter(_.contains(“HDFS”).map(_.split(t)(3).take(10)作用于RDD上的操作分为转换(transf

13、ormation)和动作(action)。Spark中的所有transformation都是惰性的,在执行transformation,并不会提交Job,只有在执行action操作,才会被提交到集群中真正的被执行。Transformation:将已有RDD转换得到一个新的RDD。Action:计算,返回结果或把RDD数据写到存储系统中。RDD操作PPT模板下载:1ppt/moban/窄依赖(narrowdependencies):父RDD的每个分区都只被子RDD的一个分区所依赖比如map、filter、union等宽依赖(widedependencies):父RDD的分区被多个子RDD的分区所

14、依赖。比如groupByKey、reduceByKey等1.程序优化:窄依赖支持在一个节点上管道化执行。如在filter之后执行map2.容错:窄依赖支持更高效的故障还原。只有丢失的父RDD的分区需要重新计算宽依赖需要所有父RDD的分区,因此就需要完全重新执行Checkpoint:Lineage链较长、宽依赖的RDD需要采用检查点机制。RDD依赖PPT模板下载:1ppt/moban/Transformation作用作用示例示例结果结果map(func)返回一个新的数据集,其中每个元素都是由源RDD中一个元素经func转换得到的Rdd.map(x=x+1)1,2,3,32,3,4,4filter

15、(func)返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后的结果rdd.filter(x=x!=1)1,2,3,32,3,3flatMap(func)类似于map,但每个输入元素可以映射到0到n个输出元素Rdd.flatMap(x=x.split(“)“helloworld”,”hi”“hello”,”world”,”hi”union(otherDataset)返回源数据集和参数数据集的并集Rdd.union(other)1,2,33,4,51,2,3,3,4,5distinct(numTasks)返回对源数据集做元素去重后的新数据集Rdd.distinct()1,2,3

16、,31,2,3groupByKey(numTasks)若源RDD包含(K,V)对,则返回一个新的数据集包含(K,Iterable)对Rdd.groupByKey()Rdd=(1,2),(3,4),(3,6)(1,2),(3,4,6)reduceByKey(func,numTasks)若源RDD为(K,V)对,则为(K,V)对的RDD,每个key对应的value是经过func聚合后的结果Rdd.reduceByKey(x,y)=x+y)Rdd=(1,2),(3,4),(3,6)(1,2),(3,10)join(otherDataset,numTasks)若源RDD为(K,V)且参数RDD为(K,

17、W),则返回的新RDD中将包含内关联后key对应的(K,(V,W)对Rdd.join(other)Rdd=(1,2),(3,4),(3,6)Other=(3,9)(3,(4,9),(3,(6,9)TransformationsPPT模板下载:1ppt/moban/Action作用作用示例示例1,2,3,3结果结果reduce(func)将RDD中元素按func进行聚合Rdd.reduce(x,y)=x+y)9collect()将数据集中所有元素以数组形式返回驱动器(driver)程序。Rdd.collect()1,2,3,3count()返回数据集中元素个数Rdd.count()4first(

18、)返回数据集中首个元素(类似于take(1))Rdd.first()1take(n)返回数据集中前n个元素Rdd.take(2)1,2saveAsTextFile(path)将数据集中元素保存到指定目录下的文本文件中,支持本地文件系统、HDFS或者其他任何Hadoop支持的文件系统。saveAsSequenceFile(path)将数据集中元素保存到指定目录下的HadoopSequence文件中,支持本地文件系统、HDFS或者其他任何Hadoop支持的文件系统。saveAsObjectFile(path)将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用SparkContext

19、.objectFile来读取。ActionsSpark集群模式Spark完整示例集群模式概览术语解释Stage划分Spark任务调度Spark运行模式PPT模板下载:1ppt/moban/Spark完整示例Scala:submit提交:spark-submit-masteryarn-classcom.xxxAppName.jarimportorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfobjectSimpleAppdefmain(args:ArrayString)vallogFile=YOUR_SPARK_HOME/R

20、EADME.mdvalconf=newSparkConf().setAppName(SimpleApplication)valsc=newSparkContext(conf)vallogData=sc.textFile(logFile,2).cache()valnumAs=logData.filter(line=line.contains(a).count()valnumBs=logData.filter(line=line.contains(b).count()println(Lineswitha:%s,Lineswithb:%s.format(numAs,numBs)Python:subm

21、it提交:spark-submit-masteryarn-executor-memory10gAppName.pyfrompysparkimportSparkContextlogFile=YOUR_SPARK_HOME/README.mdsc=SparkContext(local,SimpleApp)logData=sc.textFile(logFile).cache()numAs=logData.filter(lambdas:ains).count()numBs=logData.filter(lambdas:bins).count()print(Lineswitha:%i,lineswith

22、b:%i%(numAs,numBs)PPT模板下载:1ppt/moban/在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。Spark会为该应用在各个集群节点上申请executor,用于执行计算任务和存储数据。接下来,Spark将应用程序代码(JAR包或者Python文件)发送给所申请到的executor。最后SparkContext将分割出的task发送给各个executor去运行。集群模式概览PPT模板下载:1ppt/moban/注意:1.每个

23、Spark应用程序都有其对应的多个executor进程,executor进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式运行所收到的任务。好处:可以隔离各个Spark应用,从调度角度来看,每个driver可以独立调度本应用程序内部的任务,从执行器角度来看,不同的Spark应用对应的任务将会在不同的JVM中运行。坏处:多个Spark应用程序之间无法共享数据,除非把数据写到外部存储中。2.Spark对底层的ClusterManager一无所知。只要Spark能申请到executor进程,并且能与之通信即可。3.driver在整个生命周期内必须监听并接受其对应的各个Executor的连接

24、请求。因此,driver必须能够被所有worker节点访问到。4.因为集群上的任务是由driver来调度的,所以driver应该和worker节点距离近一些,最好在同一个本地局域网中。如果你需要远程对集群发起请求,最好还是在driver节点上启动RPC服务,来响应这些远程请求,同时把driver本身放在集群worker节点比较近的机器上。集群模式概览PPT模板下载:1ppt/moban/术语术语描述描述Application用户编写的Spark应用程序,包含一个Driverprogram和分布在集群中多个节点上运行的若干ExecutorDriverprogram运行Application的ma

25、in()函数并且创建SparkContextClustermanager在集群上获取资源的外部服务(例如:Standalone、Mesos、Yarn)Workernode集群中任何可以运行Application代码的节点ExecutorApplication运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批ExecutorTask被送到某个Executor上的工作单元Job包含多个Task组成的并行计算,往往由SparkAction催生Stage每个Job会被拆分很多组Task,每组任务被称为Stage,也

26、可称TaskSet,一个作业分为多个阶段DAGScheduler实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Taskset放到TaskScheduler中TaskScheduler与DAGScheduler交互,实现Task分配到Executor上执行术语解释PPT模板下载:1ppt/moban/Stage划分stage的边界有两种情况:1.宽依赖上的Shuffle操作;2.已缓存分区,它可以缩短父RDD的计算过程。一个stage的开始就是从外部存储或shuffle结果中读取数据,一个stage的结束就是发生s

27、huffle或生成结果时PPT模板下载:1ppt/moban/1.创建RDD,经过一系列Transformation,最后Action2.Action会触发SparkContext的rujob方法,交给DAGScheduler处理3.DAGScheduler将DAG划分成Stage4.将Stage交给TaskScheduler5.集群的Executor上运行Spark任务调度PPT模板下载:1ppt/moban/一个按A-Z首字母分类,查找相同首字母下不同姓名总个数的例子:步骤1:创建RDD上面的例子除去最后一个collect是个动作,不会创建RDD之外,前面四个转换都会创建出新的RDD。因此

28、第一步就是创建好所有RDD(内部的五项信息)。步骤2:创建执行计划Spark会尽可能地管道化,并基于是否要重新组织数据来划分阶段(stage),例如本例中的groupByKey()转换就会将整个执行计划划分成两阶段执行。最终会产生一个作为逻辑执行计划。步骤3:调度任务将各阶段划分成不同的任务(task),每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。sc.textFile(“hdfs:/names”).map(name=(name.charAt(0),name).groupByKey().mapValues(name=names.toSet.size).col

29、lect()RDD执行过程PPT模板下载:1ppt/moban/运行环境运行环境模式模式描述描述Local单机模式常用于本地开发测试,分为local单线程和local-cluster多线程Standalone集群模式Spark自带,最简单的集群模式HadoopYarn集群模式运行在Yarn资源管理器之上,由Yarn负责资源管理,Spark负责任务调度和计算ApacheMesos集群模式运行在Mesos资源管理器之上,由Yarn负责资源管理,Spark负责任务调度和计算AmazonEc2集群模式运行在云端的集群Spark运行模式SparkSQLSparkSQL简介SparkSQL入口DataFr

30、ame简介DataFrame创建DataFrame操作和RDD互操作PPT模板下载:1ppt/moban/SQL-on-Hadoop:Hive是SQL-on-Hadoop最常用的工具,但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:MapR的DrillCloudera的ImpalaHortonworks的HiveonTezFacebook的PrestoSparkSQL(Shark-HiveonSpark)但Shark对于Hive的太多依赖,制约

31、了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL简介PPT模板下载:1ppt/moban/入口:入口:SQLContext:SparkSQL的所有功能入口都是SQLContext类及其子类。要创建一个SQLContext对象,首先需要有一个SparkContext对象。Scala:Valsc:SparkContext/假设已经有一个SparkContext对象ValsqlContext=neworg.apache.spark.sql.SQLContext(sc)Python:sqlContext=SQLContext(sc)HiveContext:HiveCon

32、text继承自SQLContext,除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。Scala:ValHiveContext=neworg.apache.spark.sql. hive.HiveContext(sc)Python:HiveContext=HiveContext(sc)SparkSQL入口PPT模板下载:1ppt/moban/DataFrame:是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表或者R和Python中的dataframe等价,只不过在底层,Dat

33、aFrame采用了更多优化。DataFrame可以从很多数据源加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。相对于RDD,DataFrame有几个特点:1.包含schema信息,能够进行针对性的优化。2.对用户有更加友好、更直观的API。3.与外部数据源API紧密集成,可以用作多种存储格式和存储系统间的数据交换媒介。DataFrame简介PPT模板下载:1ppt/moban/创建DataFrame1.从json文件创建DataFrame:val df = hiveContext.read.json(examples/src/main/resources/

34、people.json)2.从parquet文件创建DataFrame:val df = hiveContext.read.parquet(examples/src/main/resources/people.parquet)3.从orc文件创建DataFrame:val df = hiveContext.read.orc(examples/src/main/resources/people.orc)4.从hive表创建DataFrame:val df =hiveContext.table(gdpi)5.从txt文件创建DataFrame:val df =hiveContext.read.te

35、xt(/path/to/spark/README.md)PPT模板下载:1ppt/moban/DataFrame操作1.展示DataFrame的内容df.show()/agename/nullMichael/30Andy/19Justin2.展示所有人,但所有人的age都加1df.select(df(name),df(age)+1).show()/name(age+1)/Michaelnull/Andy31/Justin203.计算各个年龄的人数df.groupBy(age).count().show()/agecount/null1/191/301PPT模板下载:1ppt/moban/Dat

36、aFrame操作SQL操作:1.首先把DataFrame注册为临时表df.registerTempTable(people)2.HiveContext.sql执行SQL查询,并返回DataFrame,语法与Hql一致valteenagers=hiveContext.sql(SELECTname,ageFROMpeopleWHEREage=13ANDagePerson(p(0),p(1).trim.toInt).toDF()people.registerTempTable(people)/sqlContext.sql方法可以直接执行SQL语句valteenagers=sqlContext.sql

37、(SELECTname,ageFROMpeopleWHEREage=13ANDageStructField(fieldName,StringType,true)/将RDDpeople的各个记录转换为Rows,即:得到一个包含Row对象的RDDvalrowRDD=people.map(_.split(,).map(p=Row(p(0),p(1).trim)/将schema应用到包含Row对象的RDD上,得到一个DataFramevalpeopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)/将DataFrame注册为tablepeople

38、DataFrame.registerTempTable(people)/执行SQL语句valresults=sqlContext.sql(SELECTnameFROMpeople)PPT模板下载:1ppt/moban/和RDD互操作编程方式定义编程方式定义Schema:Python:Frompyspark.sqlimportSQLContextFrompyspar.sql.typesimport*sqlContext=SQLContext(sc)/加载文件Lines=sc.textFile(“examples/src/main/resources/people.txt”)Parts=lines

39、.map(lambdal:l.split(“,”)/转换每一行为元组People=parts.map(lambdap:(p0,p1.strip()/定义schemaschemaString=“nameage”Fields=StructField(field_name,StringType(),True)forfield_nameinschemaString.split()Schema=StructType(fields)/将schema应用到RDD上schemaPeople=sqlContext.createDataFrame(people,schema)/注册为表schemaPeople.r

40、egisterTempTable(“people”)Results=sqlContext.sql(“SELECTnameFROMpeople”)SparkStreamingSpark Streaming简介Spark Streaming与Strom的对比Spark Streaming工作原理Spark Streaming编程离散数据流转换和输出算子PPT模板下载:1ppt/moban/SparkStreaming简介SparkStreaming能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ

41、、Kinesis以及TCPsockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库数据库和现场仪表盘。在“OneStackrulethemall”的基础上,还可以使用Spark的其他子框架,如机器学习、图计算等,对流数据进行处理。PPT模板下载:1ppt/moban/SparkStreaming与Strom的区别StromSpark Streaming传入一个处理一个,吞吐量相对小批处理,吞吐量大实现真正流式实时的处理数据,延迟在秒级以下,实时性很高本质还是批量处理,在短的时间窗口内进

42、行数据实时处理,延迟在秒级左右,实时性相对较弱每个单独的记录当它通过系统时必须被跟踪,所以Storm能够至少保证每个记录将被处理一次,但是在从错误中恢复过来时候允许出现重复记录能够保证每个批处理的所有数据只处理一次,保证数据不会在恢复的时候错乱(批处理重新执行)Clojure语言开发Scala语言开发提供javaAPI提供java、pythonAPI2019年开始就在Twitter内部生产环境中使用,已经非常成熟2019年才陆续有一些公司开始试用,但最近越来越多公司已经在生产中使用PPT模板下载:1ppt/moban/SparkStreaming工作原理SparkStreaming从实时数据流

43、接入数据,再将其划分为一个个小批量供后续Sparkengine处理,最终得到处理后的一批批结果数据。处理流程:1.SparkStreaming把实时输入数据流以时间片t(如1秒)为单位切分成块2.SparkStreaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据3.每个块都会生成一个SparkJob处理4.最终结果也返回多块PPT模板下载:1ppt/moban/SparkStreaming编程入口:StreamingContext是SparkStreaming的入口。valconf=newSparkConf().setMaster(local2).setAppName(N

44、etworkWordCount)valssc=newStreamingContext(conf,Seconds(1)/已有sparkcontext的情况valssc=newStreamingContext(sc,Seconds(1)Python:Sc=SparkContext(“local2”,”NetworkWordCount”)Ssc=StreamingContext(sc,1)步骤:1.创建DStream对象,并定义好输入数据源。2.基于数据源DStream定义好计算逻辑和输出。3.调用streamingContext.start()启动接收并处理数据。4.调用streamingCont

45、ext.awaitTermination()等待流式处理结束。5.你可以主动调用streamingContextssc.stop()来手动停止处理流程。注意点:1.一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。2.一旦streamingContext被stop掉,就不能restart。3.单个JVM虚机同一时间只能包含一个active的StreamingContext。PPT模板下载:1ppt/moban/SparkStreaming编程Spark-shell中:importorg.apache.spark._importorg.apache.spark.str

46、eaming._valssc=newStreamingContext(sc,Seconds(1)/创建StreamingContext,批次间隔为1秒vallines=ssc.socketTextStream(“192.168.5.2,9998)/创建一个连接到hostname:port的Dstreamvalwords=lines.flatMap(_.split(“”)/将每一行分割成多个单词valpairs=words.map(word=(word,1)/对每一批次中的单词进行计数valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()/将

47、该DStream产生的RDD的头十个元素打印到控制台上ssc.start()/启动流式计算ssc.awaitTermination()/等待直到计算终止Python:FrompysparkimportSparkComtextFrompyspark.streamingimportStreamingContextWords=lines.flatMap(lambdaline:line.split(“”)/将每一行分割成多个单词Pairs=words.map(lambdaword:(word,1)/对每一批次中的单词进行计数wordCounts=pairs.reduceByKey(lambdax,y:

48、x+y)wordCount.pprint()/将该DStream产生的RDD的头十个元素打印到控制台上PPT模板下载:1ppt/moban/离散数据流离散数据流 (DStreams)Dstream:离散数据流是SparkStreaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集。每个RDD都包含了特定时间间隔内的一批数据。任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将lines这个DStream转成wor

49、dsDStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。PPT模板下载:1ppt/moban/Transformation算子Transformation用途用途transform(func)返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作updateStateByKey(func)返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为fu

50、nc的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。Transform算子:可以用tranform算子来包装任何DStreamAPI所不支持的RDD算子ValspamInfoRDD=ssc.sparkContext.newAPIHadoopRDD()/包含垃圾信息的RDDValcleanedDstream=wordCounts.transform(rdd=rdd.join(spamInfoRDD).filter()/将Dstream中的RDD和spamInfoRDD关联,并实时过滤垃圾数据.)updateStateByKey算子:统计数据流中每个单词的出现次数。Defup

51、dateFunction(newValues:SeqInt,runningCount:OptionInt):OptionInt=valnewCount=./将新的计数值和之前的状态值相加,得到新的计数值Some(newCount)ValrunningCounts=pairs.updateStateByKeyInt(updateFunction_)PPT模板下载:1ppt/moban/窗口算子SparkStreaming同样也提供基于时间窗口的计算,也就是说,你可以对某一个滑动时间窗内的数据施加特定tranformation算子。每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新

52、的windowedDStream。windowlength(窗口长度)窗口覆盖的时间长度slidinginterval(滑动距离)窗口启动的时间间隔/每隔10秒归约一次最近30秒的数据ValwindowedWordCounts=pairs.reduceByKeyAndWindow(a:Int,b:Int)=(a+b),Seconds(30),Seconds(10)PPT模板下载:1ppt/moban/窗口算子窗口算子窗口算子用途用途window(windowLength,slideInterval)将源DStream窗口化,并返回转化后的DStreamcountByWindow(windowL

53、ength,slideInterval)返回数据流在一个滑动窗口内的元素个数reduceByWindow(func,windowLength,slideInterval)用func做聚合,返回一个单元素数据流reduceByKeyAndWindow(func,windowLength,slideInterval,numTasks)基于(K,V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每个value都是各个key经过func聚合后的结果reduceByKeyAndWindow(func,invFunc,windowLength,

54、slideInterval,numTasks)当新的数据进入窗口时,这些values会被输入func做归约计算,而这些数据离开窗口时,对应的这些values又会被输入invFunc做”反归约”计算countByValueAndWindow(windowLength,slideInterval,numTasks)基于包含(K,V)键值对的DStream,返回新的包含(K,Long)键值对的DStream。其中的Longvalue都是滑动窗口内key出现次数的计数PPT模板下载:1ppt/moban/输出算子输出算子可以将DStream的数据推送到外部系统,如:数据库或者文件系统。因为输出算子会将

55、最终完成转换的数据输出到外部系统,因此只有输出算子调用时,才会真正触发DStreamtransformation算子的真正执行。如果你的Streaming应用中没有输出算子,那么这个应用只会接收数据,而不会处理数据,接收到的数据最后只是被简单地丢弃掉了。输出算子输出算子用途用途print()在driver节点上打印DStream每个批次中的头十个元素。saveAsTextFiles(prefix,suffix)将DStream的内容保存到文本文件。saveAsObjectFiles(prefix,suffix)将DStream内容以序列化Java对象的形式保存到顺序文件中。saveAsHadoopFiles(prefix,suffix)将DStream内容保存到Hadoop文件中。foreachRDD(func)接收一个函数func,func将作用于DStream的每个RDD上。谢谢观赏!谢谢!

展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 建筑/环境 > 施工组织

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号