第5章 SPARK技术基础及构建SPARK集群本章内容本章首先讲解Spark的核心机制,然后介绍Hive、HBase、Kafka、Flume的原理及实践,并继续讲解这些组件的安装部署流程,最后借助两个案例讲解Spark Streaming和Spark MLlib的实际应用本章要点了解Spark的核心机制,熟悉Spark Shell操作;熟悉Hive、HBase、Kafka、Flume组件的原理及架构;掌握Spark集群搭建的方法以及其他组件的部署方法;学会编写Spark Streaming代码,整合其他组件解决实际问题5.1 SPARK核心机制本节介绍Spark的概念及主要构成组件、运行时的系统架构,并通过开发单词计数实例讲解Spark Shell的操作5.1.1 SPARK基本原理Spark是加州大学伯克利分校AMP实验室(Algorithms,Machines,and People Lab)开发的通用内存并行计算框架Spark提供了一个快速的计算、写入以及交互式查询的框架Spark使用in-memory的计算方式,以此避免一个MapReduce工作流中的多个任务对同一个数据集进行计算时的I/O瓶颈。
在保留MapReduce容错性、可扩展性等特性的同时,Spark还能保证高性能,避免磁盘I/O繁忙,主要原因是RDD(Resilient Distributed Dataset)内存抽象结构的创建5.1.1 SPARK基本原理Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集,具有以下特点:1)运行速度快:Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍2)易用性好:Spark不仅支持Scala编写应用程序,而且支持Java、Python等语言进行编写,特别地,Scala是一种高效、可拓展的语言,能够用简洁的代码处理较为复杂的处理工作3)通用性强:Spark生态圈即BDAS(伯克利数据分析栈),包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件,这些组件分别用于处理Spark Core提供的内存计算框架、SparkStreaming的实时处理应用、Spark SQL的即席查询、MLlib或MLbase的机器学习和GraphX的图处理。
5.1.1 SPARK基本原理4)随处运行:Spark具有很强的适应性,能够读取HDFS、Cassandra、HBase、S3和Techyon,为持久层读写原生数据,能够以Mesos、YARN和自身携带的Standalone作为资源管理器调度job,以完成Spark应用程序的计算除此之外,Spark有一些常用术语,如下所示:RDD:弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的抽象结构,提供了一种高度受限的共享内存模型DAG:有向无环图,反映RDD之间的依赖关系Application:Spark上的应用,即用户编写的Spark应用程序一个Application包含一个驱动器(Driver)和多个执行器(Executor)Driver Program:驱动器,即控制程序负责运行main方法及创建SparkContext进程Worker Node:工作节点,负责完成集群上应用程序的具体计算Executor:执行器,即运行在工作节点(Worker Node)上的一个进程负责运行计算任务,并为应用程序存储数据Task:任务,运行在执行器(Executor)上的工作单元,是其中的一个线程。
Cluster Manager:集群资源管理中心,负责分配计算资源Job:并行计算作业由一组任务(Task)组成,一个Job可以包含多个RDD及作用于相应RDD上的各种操作Stage:阶段,即作业的基本调度单位每个作业(Job)会划分为多组任务(Task),每组任务即阶段(Stage)5.1.2 SPARK系统架构Spark架构采用分布式计算中的Master-Slave模型,Master对应集群中含有Master进程的节点,Slave对应集群中含有Worker进程的节点Master作为整个集群的控制器,负责整个集群的正常运行;Worker则相当于计算节点,接收主节点命令并创建执行器(Executor)并行处理任务(Task);Driver负责应用的执行,即作业(Job)调度,任务(Task)分发;集群资源管理中心(Cluster Manager)负责分配整个集群的计算资源,架构如图5-1所示图5-1 Spark系统架构5.1.3 SPARK SHELL操作Spark-Shell是一个强大的交互式数据分析工具,初学者可以很好的使用它来学习相关API,用户可以在命令行下使用Scala编写Spark程序,并且每当输入一条语句,Spark-Shell就会立即执行并返回结果,Spark-Shell支持Scala和Python,如果需要进入Python语言的交互式执行环节,只需要执行“pyspark”命令即可。
首先,启动Hadoop和Spark集群,Spark集群部署见5.6.1节运行Spark-Shell应先切换到Spark安装目录的bin目录下,执行命令:bin/spark-shell -master 上述命令中,“-master”表示指定当前连接的Master节点,用于指定Spark的运行模式,可以省略如需查询spark-shell的更多使用方式可以执行“-help命令”获取帮助选项列表5.1.3 SPARK SHELL操作下面,运行一个实例开发单词计数程序:1)准备数据文件words.txt,文件内容如下读者需要在本地创建文件并上传至HDFS指定路径/spark/test下,如图5-2所示hello hadoophello sparkhello itcast图5-2 上传文件words.txt至HDFS5.1.3 SPARK SHELL操作2)执行start-dfs.sh命令启动Hadoop集群3)整合Spark与HDFSSpark加载HDFS上的文件,需要修改spark-env.sh配置文件,添加HADOOP_CONF_DIR配置参数,指定Hadoop配置文件的目录指定HDFS配置文件目录export HADOOP_CONF_DIR=/opt/modules/hadoop-2.8.2/etc/hadoop 4)重新启动Hadoop集群和Spark集群服务,使配置文件生效。
5.1.3 SPARK SHELL操作5)启动Spark-Shell编写程序启动Spark-Shell交互式界面,执行命令如下:bin/spark-shell master local2 /local表示本地模式运行,2表示启动两个工种线程Spark-Shell本身就是一个Driver,它会初始化一个SparkContext对象为“sc”,用户可以直接调用下面编写Scala代码实现单词计数,具体代码如下scalasc.textFile(“/spark/test/words.txt”).flatMap(_.split(“”).map(_,1).reduceByKey(_+_).collectres0:Array(String,Int)=Array(itcast,1),(hello,3),(spark,1),(hadoop,1)上述代码中,res0表示返回的结果对象,该对象中是一个Array(String,Int)类型的集合,(hello,3)则表示“hello”单词总计为3个6)退出Spark-Shell客户端scala:quit也可以使用快捷键5.2HIVE原理及实践本节介绍Hive的基本概念、架构体系以及具体的架构组件,并对常见的表分类和表操作进行讲解。
5.2.1 HIVE定义Hive是基于Hadoop的一个数据仓库工具,由Facebook开源,用于解决海量结构化日志的数据统计问题Hive可以将结构化的数据文件映射为一张表,并提供类SQL查询功能其本质是将HQL转化成MapReduce程序,体现在:1)Hive处理的数据存储在HDFS;2)Hive分析数据底层的实现是MapReduce;3)执行程序运行在Yarn上5.2.2 HIVE架构 Hive架构主要包括Client、Metastore和Hadoop,Hive运行于Yarn之上,其数据存储在HDFS上,架构原理图如图5-3所示图5-3 Hive架构图5.2.2 HIVE架构)用户接口:ClientCLI(hive shell)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive)(2)元数据:Metastore元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore3)Hadoop使用HDFS进行存储,使用MapReduce进行计算。
4)驱动器:Driver1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误2)编译器(Physical Plan):将AST编译生成逻辑执行计划3)优化器(Query Optimizer):对逻辑执行计划进行优化4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划对于Hive来说,就是MR/SparkHive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver并结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口5.2.3 HIVE表分类及查询操作1.表分类Hive的表由实际存储的数据和元数据组成实际数据一般存储于HDFS中,元数据一般存储于关系型数据库中Hive表有内部表、外部表、分区表、分桶表四种内部表:又叫受控表当表定义被删除时,HDFS上的数据以及元数据都会被删除外部表:数据存在与否和表定义互不约束当删除外部表时,HDFS上的数据不会被删除,但是元数据会被删除。
分区表:将一批数据分成多个目录来存储当查询数据时,Hive可以根据条件只查询指定分区的数据而无需全表扫描,提高查询效率分桶表:对数据进行哈希取值,并将不同数据放到不同文件中存储,每个文件对应一个桶可用于数据抽样,提高查询效率5.2.3 HIVE表分类及查询操作2.表操作由于 Hive 采用了类似SQL 的查询语言 HQL(Hive Query Language),因此很容易将 Hive 理解为数据库1)内部表1)创建表CREATE TABLE student(id INT,name STRING);执行上述命令,创建表student,其中字段id为整型,字段name为字符串在数据仓库目录中的test_db.db文件夹下会生成一个名为student的文件夹,即表student的数据存储目录2)查看表结构DESC student;执行上述命令,查看新创建的表student的表结构DESC FORMATTED student;执行带有FORMATTED的语句将显示详细表结构,包括表类型及在数据仓库的位置3)插入数据INSERT INTO student VALUES(1000,xiaoming);Hive会将insert插入语句转成MapReduce任务执行。
执行完成后,表中会多一条数据。