Spark框架集成应用之Helloworld

上传人:油条 文档编号:20328189 上传时间:2017-11-21 格式:DOCX 页数:4 大小:84.64KB
返回 下载 相关 举报
Spark框架集成应用之Helloworld_第1页
第1页 / 共4页
Spark框架集成应用之Helloworld_第2页
第2页 / 共4页
Spark框架集成应用之Helloworld_第3页
第3页 / 共4页
Spark框架集成应用之Helloworld_第4页
第4页 / 共4页
亲,该文档总共4页,全部预览完了,如果喜欢就下载吧!
资源描述

《Spark框架集成应用之Helloworld》由会员分享,可在线阅读,更多相关《Spark框架集成应用之Helloworld(4页珍藏版)》请在金锄头文库上搜索。

1、Spark 框架集成应用之 Helloworld前言:经过几天的学习和探索,终于把 Spark 框架的几个模块集成起来了,从 SparkStream 读取 kafka 消息流,并解析消息构建图,同时结合 Pregel shortpath 算法计算最短路径,最 后将计算结果保存至数据库。虽然是 helloworld 级别的程序,但麻雀 虽小,五脏俱全,当然,千里之行始于足下!今天把这几天来的学习心得整理一下,分享给所有 spark 的爱好者,当然也许有很多的 不足,就算是抛砖引玉吧!一、 概述该实验是基于一个简化版,微缩版的智能交通场景来实现的,通过构建一张道路路况图,结合实时的路况信息,找出

2、A 路口到 B 路口所有交通路口之间的最短路径。首先假设各个交通路口为图的顶点,各个道路的路长和拥堵状况为图的边的属性。假设静态的图顶点数据存在磁盘文件中,动态的路况通过接收 kafka 实时消息流的方式实现。最后,将计算的结果存入数据库。总体的框架图如下:上图中,该实验暂时只实现接收消息并计算最短路径和持久化到数据库的功能。 (其他外围的可考虑在下个实验中实现)二、 运行环境及准备工作1、 安装 Scala2、 安装 Spark具体安装过程可以参考 spark 官方网站或网上资料,但有一点需要注意的是,该实验中 Graphx 只支持 spark-1.0.2 版本,暂不支持 spark-1.1

3、.0 版本。 3、 安装 Kafka参考官方网站4、 安装 Mysql参考网络资料三、 编码与代码说明由于这只是一个小实验,因此采用一刀切的方法,从消息读取,解析,存储等都在一个 scala object 中实现。1、 读取顶点数据2、 读取边数据3、 解析 Json 边数据4、 构建图val sparkHome = System.getenv(SPARK_HOME)val vertexs : RDD(VertexId,(String) = sc.textFile(sparkHome + /test/vdata.txt).map(line = line.split(s+).map( v = (

4、v(0).toLong : VertexId,( v(1).toString : String)val Array(zkQuorum, group, topics, numThreads) = argsval topicpMap = topics.split(,).map(_,numThreads.toInt).toMapval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap,StorageLevel.MEMORY_ONLY).map(_._2)val eo = lines.flatMap(s = s.split(

5、,).map(n = n.split(:).map(e = e(1).toString.split(,).map(t = t.split(:).map(l = EdgeObj(e(0).toString.replace(, ),l(0).toString,l(1).toString.replace(, ).toDouble)eo.foreachRDD rd = val edges :RDDEdgeDouble = sc.parallelize(rd.flatMap(ea = ea.map(e = Edge(e.node1.toLong,e.node2.toLong,e.length).toAr

6、ray)这里 edges 必须为ParallelCollectionRDD,因此必须parallerlize 化val graph = Graph(vertexs, edges, defaultVertex)graph.persist(StorageLevels.MEMORY_AND_DISK)因为下面还需要用到 graph,因此建议将它persist 起来5、 计算最短路径6、 保存数据库vertexs.collect.map case(k,v) = k.foreachid = val sourceId = id.toLongval initialGraph = graph.mapVerti

7、ces(id, _) = if (id.toLong = sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)(id, dist, newDist) = math.min(dist, newDist), / Vertex Programtriplet = / Send Messageif (triplet.srcAttr + triplet.attr math.min(a,b) / Merge Message)println(shortpath:+ss

8、sp.vertices.collect.mkString(n)val conn = getConnection()val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)sssp.vertices.collect.foreach t =if(t._2.toDouble = Double.PositiveInfinity )insert(sourceId,t._1 ,999999D ,conn)elseinsert(sourceId,t._1 ,t._2 ,conn)

9、def insert(start : Long, target : Long , length : Double , conn : Connection) : Unit = val prep = conn.prepareStatement(INSERT INTO path (start, target,length) VALUES (?, ?, ?) )prep.setLong(1, start)prep.setLong(2, target)prep.setDouble(3, length)prep.executeUpdategetConnect 根据不同的数据库创建连接,可参考 scala

10、数据库的连接方式Mysql 的数据库连接:四、 开发中遇到的问题与解决方法曾经在几个地方栽了跟头,折腾了很久,现将其罗列出来,希望大家可以少走一些弯路。1、 RDD 的 foreach 和 foreachRDD 操作不会改变外部变量的值如以下代码中 inner foreach 中 target 有值,但 foreach 外 target 的值则为空。解决方法:在 map 之前加上 collect(虽 collect 需慎用,因为很耗时,但有时也不得不用)另外参考官方的说明:if your application does not have any output operation, or ha

11、s output operations like dstream.foreachRDD() without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.2、 Build graph 时,edges,vertexs RDD 必须为 ParallelCollectionRDD,否则如 MappedRDD 虽编译时不会报错,但运行时会出错。def getConnection() : Connection = val

12、dbc = jdbc:mysql:/localhost:3306/test?user=root&password=rootclassOfcom.mysql.jdbc.Driverval conn = DriverManager.getConnection(dbc)connval sparkHome = System.getenv(SPARK_HOME)val vertexs : RDD(VertexId,(String) = sc.textFile(sparkHome + /test/vdata.txt).map(line = line.split(s+).map( v = (v(0).toL

13、ong : VertexId,( v(1).toString : String)val target = new mutable.HashMapString,Long()vertexs.foreachv = target += (v._2.toString - v._1.toLong )target.map(t = =inner foreach= + t._1 +: + t._2 ).foreach(println)target.map(t = =outof foreach= + t._1 +: + t._2 ).foreach(println)vertexs.collect.foreachv =

展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 行业资料 > 其它行业文档

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号