尚硅谷大数据项目之实时项目4

上传人:hs****ma 文档编号:496235888 上传时间:2023-06-26 格式:DOCX 页数:15 大小:296.23KB
返回 下载 相关 举报
尚硅谷大数据项目之实时项目4_第1页
第1页 / 共15页
尚硅谷大数据项目之实时项目4_第2页
第2页 / 共15页
尚硅谷大数据项目之实时项目4_第3页
第3页 / 共15页
尚硅谷大数据项目之实时项目4_第4页
第4页 / 共15页
尚硅谷大数据项目之实时项目4_第5页
第5页 / 共15页
点击查看更多>>
资源描述

《尚硅谷大数据项目之实时项目4》由会员分享,可在线阅读,更多相关《尚硅谷大数据项目之实时项目4(15页珍藏版)》请在金锄头文库上搜索。

1、第1章需求分析1.1简介实时预警,是一种经常出现在实时计算中的业务类型。根据日志数据中系统报错异常, 或者用户行为异常的检测,产生对应预警日志。预警日志通过图形化界面的展示,可以提醒 监控方,需要及时核查问题,并采取应对措施。1.2需求说明需求:同一设备,5分钟内三次及以上用不同账号登录并领取优惠券,并且在登录到领 券过程中没有浏览商品。达到以上要求则产生一条预警日志。1.3 预:日志格式同一设备,每分钟只记录一次预警。mid设备iduids领取优惠券登录过的uiditemIds优惠券涉及的商品idevents发生过的行为ts发生预警的时间戳第2章整体流程设计2.1框架流程springboot

2、Kafka clusterElasticSearchSpark streamingspringbootspringbootnglnx2.2开发思路1)从kafka中消费数据,根据条件进行过滤筛选,生成预警日志;2)预警日志保存到ElasticSearch中;3)利用Kibana快速搭建可视化图形界面。第3章实时计算模块3.1筛选条件分析同一设备(分组)5分钟内(窗口)三次不同账号登录(用户)领取优惠券(行为)没有浏览商品(行为)同一设备每分钟只记录一次预警(去重)3.2数据处理流程图3.3代码开发3.3.1事件日志样例类-Eventinfocase class EventInfo(mid:St

3、ring, uid:String, appid:String, area:String, os:String, ch:String, type:String, evid:String, pgid:String, npgid:String, itemid:String, var logDate:String, var logHour:String, var ts:Long)3.3.2预警日志样例类-CouponAlertInfocase class CouponAlertInfo(mid:String,uids:java.util.HashSetString,itemIds:java.util.

4、HashSetString, events:java.util.ListString, ts:Long)3.3.3预警业务类一AlertAppimport com.alibaba.fastjson.JSONimport com.atguigu.gmall.constant.GmallConstantsimportcom.atguigu.gmall2019.realtime.bean.CouponAlertInfo,EventInfoimport com.atguigu.gmall2019.realtime.util.MyEsUtil, MyKafkaUtil import org.apache

5、.kafka.clients.consumer.ConsumerRecordimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStream, InputDStream import org.apache.spark.streaming.Seconds, StreamingContextimport scala.util.control.Breaks._object AlertApp def main(args: ArrayString): Unit = valsparkConf:SparkCo

6、nf=newSparkConf().setMaster(local*).setAppName(event_app)val ssc = new StreamingContext(sparkConf,Seconds(5)val inputDstream: InputDStreamConsumerRecordString, String = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_EVENT,ssc)/1格式转换成样例类val eventInfoDstream: DStreamEventInfo = inputDstream.map

7、 record =val jsonstr: String = record.value()val eventInfo:EventInfo = JSON.parseObject(jsonstr,classOfEventInfo)eventInfo/2开窗口val eventInfoWindowDstream:DStreamEventInfo =eventInfoDstream.window(Seconds(30),Seconds(5)/3同一设备分组val groupbyMidDstream: DStream(String, IterableEventInfo) = eventInfoWindo

8、wDstream.map(eventInfo=(eventInfo.mid,eventInfo). groupByKey()/4判断预警/在一个设备之内/1三次及以上的领取优惠券(evid coupon)且uid都不相同/2没有浏览商品(evid clickItem)val checkCouponAlertDStream: DStream(Boolean, CouponAlertInfo) =groupbyMidDstream.map case (mid, eventInfoItr)=val couponUidsSet = new util.HashSetString()val itemIds

9、Set = new util.HashSetString() val eventIds = new util.ArrayListString() var notClickItem: Boolean = true breakable(for (eventInfo: EventInfo =3& notClickItem,CouponAlertInfo(mid, couponUidsSet, itemIdsSet, eventIds, System.currentTimeMillis() /过滤 val filteredDstream: DStream(Boolean, CouponAlertInf

10、o) = checkCouponAlertDStream.filter_._1/增加一个id用于保存到es的时候进行去重操作val alertInfoWithIdDstream: DStream(String, CouponAlertInfo) = filteredDstream.map case (flag, alertInfo) = val period: Long = alertInfo.ts / 1000L / 60L val id: String = alertInfo.mid + _ + period.toString (id, alertInfo) alertInfoWithId

11、Dstream.foreachRDDrdd= rdd.foreachPartitionalertInfoWithIdIter= MyEsUtil.insertBulk(GmallConstants.ES_INDEX_COUPON_ALERT ,alertIn foWithIdIter.toList) ssc.start() ssc.awaitTermination()第4章ElasticSearch的保存4.1 ES集群搭建参考ElasticSearch集群安装手册4.2 ES上建好索引其实即使不提前建立索引,ES也是可以将数据保存进去的。这种情况,ES会根据第一 条要插入的数据进行推断,但是

12、ES的这种推断往往不够准确。比如:要区分字段要不要进行索引,字段要不要进行分词,如果分词选用哪个分词器等 等。建立索引语句(包含Mapping)PUT gmall_coupon_alertmappings: _doc”:properties:mid:type:keyword,uids:type:keyword,itemIds:type:keyword,events:type:keyword,ts:type:date4.3保存ES4.3.1 pom.xmlio.searchboxjest5.3.3net.java.dev.jnajna4.5.2org.codehaus.janinocommons

13、-compiler2.7.8 4.3.2保存ES的工具类import java.utilimport java.util.Objectsimport io.searchbox.client.JestClient, JestClientFactoryimport io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Bulk, BulkResult, Index import collection.JavaConversions._object MyEsUtil private val ES_HOST = http:/hadoop102”private val ES_HTTP_PORT = 9200private var fac

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 学术论文 > 其它学术论文

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号