Spark结构化数据流StructuredStreaming

上传人:桔**** 文档编号:486282822 上传时间:2024-01-20 格式:DOCX 页数:9 大小:17.55KB
返回 下载 相关 举报
Spark结构化数据流StructuredStreaming_第1页
第1页 / 共9页
Spark结构化数据流StructuredStreaming_第2页
第2页 / 共9页
Spark结构化数据流StructuredStreaming_第3页
第3页 / 共9页
Spark结构化数据流StructuredStreaming_第4页
第4页 / 共9页
Spark结构化数据流StructuredStreaming_第5页
第5页 / 共9页
点击查看更多>>
资源描述

《Spark结构化数据流StructuredStreaming》由会员分享,可在线阅读,更多相关《Spark结构化数据流StructuredStreaming(9页珍藏版)》请在金锄头文库上搜索。

1、Spark 结构化数据流 StructuredStreaming是一个构建在Spark SQL之上的一个高容错可扩展的流处理引擎。当然你的流计算也可以用相同的方式对静态数据进行批处理计算。它可以不断更新持续流进来的递增数据,并且将计算结果也持续的更新。目前的Dataset/DataFrame 的API支持的语言有 Scala,Java和Python三种语言。可以实现流聚合, 事件窗口,Join聚合等。结构化流处理是建立在 Spark SQL优化引擎Catylist之上的又一引擎。所以,其性能也是 非常好的。未来的发展的空间也是巨大的。该种流处理保证有且只处理数据一次,同时实现端到端的数据。通过

2、 检查点checkpoint和Write Ahead Logs 机制实现高容错。在Spark2.1版本中,其API仍然还是试验性质的,接下来我们就来一起走进StructuredStreaming 的殿堂吧。import org.apache.spark.sql.SparkSessionimport org.apache.spark.SparkConf object Test def main( args : Array String ): Unit = /* 配置信息:配置应用名称StructuredStreaming Test* Master 为 local3* /val conf = ne

3、w SparkConf().setAppName( StructuredStreaming Test ).setMaster (local3)val spark = SparkSession .builder ().config ( conf).getOrCreate ()import spark.implicits._/获取一行一行的数据,注意,此时的 host对应的、/master为数据源那台机器的主机名/弁且已经配置好了 hosts文件里的ip与主机映射/此时的lines是一个DataFrame对象val lines = sparkreadStreamformat (socket)opt

4、ion (host , master)option (port , 9999)load ()/用Dataset里的flatMap方法将每行转换为一个一 个的 单词val words = lines . as String . flatMap ( . split ()/单词计数val wordCounts = words. groupBy (value ). count ()/返回一个流查询对象val query =wordCounts. writeStream . outputMode (complet).format (console).start ()/等待终端query. awaitTe

5、rmination ()图 1-1 StructuredNetworkWordCount代码实现以上这段代码是处理的数据源是一台主机名为master的9999端口。通过监听 TCP端口实时监听获取流数据源。因此,我们需要通过以下的方式来开启TCP端口的监听nc -lk 9999通过观察console的终端输出的结果,我们可以判定,这段程序处理的是以10秒为单位的微批数据,也就是说,每隔10秒钟进行一次计算。此外,你还可以通过 Spark官方提供的案例来运行你的第一个结构化流处理的应用程序。在Spark官方提供的二进制包里面的bin目录下有一个run-examples的脚本,当加上org.apa

6、che.spark.examples.sql.streaming.StructuredNetworkWordCount的参数后,运行这个脚本,这样也能运行您的第一个结构化流处理程序。运行了第一个结构化流程序之后,我们再回过头来看看 Spark源码中提供给我们的三类案例。这三类案例在 Spark源码包中的路径为examples/src/main/java/org.apache.spark.sql.streaming/streaming 和examples/src/main/scala/org.apache.spark.sql.streaming/streaming(源码的导入方法详见第 xxx章

7、第xxx节)在Java代码包路径下,一共有三个案例,分别为 JavaStructuredNetworkWo rdCount,Java StructuredKafkaworkWordCount,Java StructuredNetworkWordCountWindowed 。在Scala代码包路径下,也是有三个案例,分别为 StructuredNetworkWordC ount,StructuredKafkaworkWordCount,StructuredNetworkWordCountWindowed。Java StructuredNetworkWordCount 和 StructuredNe

8、tworkWordCoun t的实现方法和图1-1是一样的。那么我们接下来以kafka作为数据源来试验一下结构化流处理的实现方法。首先创建一个类 StructuredKafkaworkWordCountTest这里还有俩个案例没有试验完成:程序模型介绍:结构化流处理Structured Streaming 的关键的思想是将一个实时流数据看做是一张持 续动态追加数据的一张表。这种思想产生了一种新的流处理模型。这种流处理模型 从某种程度上与批处理模型相似。通过这种模型,我们可以采用批处理编程模型类 似的方式进行编程,从而对流数据进行处理,就像标准批处理一样,对静态表数据 进行查询,只不过这种查询通

9、常会伴随着频繁的增量数据查询。Spark官方也把这种处理数据的方式叫做类批处理。值得注意的是,在图1-1的程序中,outputMode()的参数有三种方式,也就是说流 数据写出到外部的存储系统有三种方式:CompleteAppendUpdate基本组件:Concepts将输入的数据流当做一张表,每一条新的数据流进来的时候,就被追加到新的输入 表中,基于输入的查询操作将会生成一张结果表,无论什么时候结果表进行了更新, 我们都将要写数据到外部的下游数据系统中。处理Event-Time和延迟数据本身就内嵌到了 Event-time之中了,对于一些应用来说,有可能会基于event-time进行统计,比

10、如,如果你想要统计IoT设备每一分钟产生的数据的数量。然后,你可能想基于数据产生的时间来统计,而不是数据到达Spark这一端时的处理时间,event这个概念在结构化流处理中非常白有用。设备产生的每一个event在表中就是一行,而event-time就是该行中的一个字段属性,这使得基于window的聚合操作非常方便。实质上,所谓的窗口函数,其实就是根据 event-time或者 Process-time进行特殊的分组,然后聚合。每个窗口,就是一组。此外,这个模型很自然的解决了处理延迟数据的问题,究竟要不要将延迟的数据加 入统计结果当中。如果要加,那么是将所有的都加吗?还是符合条件的才加,也就 是

11、延迟时间超过一定大小,我们就丢掉延迟数据,反之,我们仍然将数据添加进来。Spark官方对此应用了 watermarking 的概念。通过 watermarking 来作为评判标准和二 者的分界线。高容错语义的实现端到端的exactly-once语义是结构化流处理设计出来的关键目标。Spark官方为了实现它,因而设计了结构化流处理的sources , sinks以及执行引擎engine去追踪数据的处理的进度,从而 可以处理重新开始或者数据重复消费等任何情况下的各种失败。首先,我们假设 每个数据源都有offsets偏移量(类似kafka的偏移量),我们定位到了上一次消费的流数 据的位置,引擎用 c

12、heckpinting和write ahead logs 去记录偏移量。同时,结构化流sinks被设计成是哥等的。这样,二者合在一起,通过可复制的数据源,Write Ahead Logs 记录offset , checkpoint记录state ,数据处理后的下游sinks又是哥等的。最终就可以做到exactly once 。Spark2.0以及之后,DataFrame和Dataset都可以通过 SparkSession 对象来静态调用对应的方法来创建,如果您还不太熟悉SparkSession ,那就应该多看看 Spark1.6后的Spark2.0 了。SparkSession.readStr

13、eam() 会返回一个 DataStreamReaderData Source File source Kafka source Socket source (for testing)DataStreamReaderorg.apache.spark.sql在Spark2.1.0的源码中是一个final类型的类,其只能是包下的其他类才能访问。其成员方法format()指定输入数据源的数据格式,schema ()指定数据的结构信息。一些像 Json这样的数据格式,DataStreamReader可以推断出来它的数据结构,所以不用显示定义其schema。但是如果显示指定了其schema的话,加载数据

14、的时候会跳过这个步骤,从而加速读取数据过程。除此之外,还有如下的一些重要的方法。在开发和性能调优的过程中可能会经常用到。Option() 方法:该方法通过传入字符串key和value 的值,从而显示指定应用程序的配置属性值,比如应用的名称,应用的 Master等。每调用一次,就返回一个新的DataStreamReader 。Load()方法:该方法返回一个 的一种特殊形式,但是 2.1.0DataSet对象(也可以说是 Dataframe 对象,Dataframe 是 Dataset版本的源码中又没有这个类存在了)。outputMode()指定数据以怎样的方式写出,2.1.0 版本目前有三种,

15、一种是 complete,种是 append, 还有种是 update( 详细请看 xxxx)Trigger()该方法指定每隔多长时间写出一次数据到外部系统,比如ProcessingTime(4.seconds)queryName指定结构化流操作的名字,方便以后对该流操作的查询。partitionBy将写出的数据根据指定字段进行排序操作。start开始执行流查询,并且能够持续的将不断计算的结果输出到指定的地方。foreach通过传入一个ForeachWriter匿名类,从而通过实现匿名类的方法,打开新的连接,并且通过连接写数据到连接中。format指定输出数据格式,比如parquet 等压缩格式。options指定输出数据的配置属性值DataStreamWriter 同DataStreamReader 类似,也是final 类型,且访问权限也只是开放给sql包。其主要的方法和作用如下所示对于progress ,这

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 办公文档 > 演讲稿/致辞

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号