数据采集与预处理(共9章)-第4章-分布式消息系统Kafka

举报
资源描述
第第4章章 分布式消息系统分布式消息系统Kafka4.1.1Kafka的特性Kafka具有以下良好的特性:高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;可扩展性:Kafka集群具有良好的可扩展性;持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份,防止数据丢失;容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);高并发:支持数千个客户端同时读写。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃;顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个分区内的消息的有序性;异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。4.1.2Kafka的应用场景Kafka的主要应用场景包括:日志收集:一个公司可以用Kafka收集各种日志,这些日志被Kafka收集以后,可以通过Kafka的统一接口服务开放给各种消费者,例如Hadoop、HBase、Solr等;消息系统:可以对生产者和消费者实现解耦,并可以缓存消息;用户活动跟踪:Kafka经常被用来记录Web用户或者APP用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到Kafka的主题(Topic)中,然后订阅者通过订阅这些主题来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;运营指标:Kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产环节各种操作的集中反馈,比如报警和报告;流式处理:Kafka实时采集的数据可以传递给流处理框架(比如SparkStreaming和Storm)进行实时处理。4.1.3Kafka的消息传递模式一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。对于消息系统而言,一般有两种主要的消息传递模式:点对点传递模式和发布订阅模式。大部分的消息系统选用发布订阅模式。Kafka就是一种发布订阅模式。1.点对点消息传递模式点对点消息传递模式在点对点消息系统中(如图4-1所示),消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。图4-1点对点消息系统的架构2.发布订阅消息传递模式发布订阅消息传递模式在发布订阅消息系统中(如图4-2所示),消息被持久化到一个主题(topic)中。与点对点消息系统不同的是,消费者可以订阅一个或多个主题,消费者可以消费该主题中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布订阅消息系统中,消息的生产者称为“发布者”,消费者称为“订阅者”。图4-2发布订阅消息传递模式的架构4.2Kafka在大数据生态系统中的作用如图4-3所示,在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换,较好地满足各种企业应用需求。同时,借助于Kafka作为交换枢纽,也可以很好解决不同系统之间的数据生产/消费速率不同的问题。比如在线上实时数据需要写入HDFS的场景中,线上数据不仅生成速率快,而且具有突发性,如果直接把线上数据写入HDFS,可能会导致高峰时间HDFS写入失败,在这种情况下,就可以先把线上数据写入Kafka,然后借助于Kafka导入到HDFS。4.2Kafka在大数据生态系统中的作用图4-3Kafka作为数据交换枢纽4.3Kafka与Flume的区别与联系Kafka与Flume的很多功能确实是重叠的,二者的联系与区别如下:(1)Kafka是一个通用型系统,可以有许多的生产者和消费者分享多个主题。相反地,Flume被设计成特定用途的工作,特定地向HDFS和HBase发送数据。Flume为了更好地为HDFS服务而做了特定的优化,并且与Hadoop的安全体系整合在了一起。因此,如果数据需要被多个应用程序消费的话,推荐使用Kafka,如果数据只是面向Hadoop的,可以使用Flume。(2)Flume拥有许多配置的数据源(source)和数据槽(sink),而Kafka拥有的是非常小的生产者和消费者环境体系。如果数据来源已经确定,不需要额外的编码,那么可以使用Flume提供的数据源和数据槽,反之,如果需要准备自己的生产者和消费者,那么就需要使用Kafka。(3)Flume可以在拦截器里面实时处理数据,这个特性对于过滤数据非常有用。Kafka需要一个外部系统帮助处理数据。(4)无论是Kafka或是Flume,两个系统都可以保证不丢失数据。(5)Flume和Kafka可以一起工作。Kafka是分布式消息中间件,自带存储,更合适做日志缓存,Flume数据采集部分做得很好,可以使用Flume采集日志,然后,把采集到的日志发送到Kafka中,再由Kafka把数据传送给Hadoop、Spark等消费者。4.4Kafka相关概念Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:Broker:Kafka集群包含一个或多个服务器,这些服务器被称为“Broker”。Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为“Topic(主题)”。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。Partition:是物理上的概念,每个Topic包含一个或多个Partition。Producer:负责发布消息到KafkaBroker。Consumer:消息消费者,向KafkaBroker读取消息的客户端。ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,可为每个Consumer指定GroupName,若不指定GroupName,则属于默认的Group。同一个Topic的一条消息只能被同一个ConsumerGroup内的一个Consumer消费,但多个ConsumerGroup可同时消费这一消息。4.4Kafka相关概念图4-4给出了Kafka的总体架构。一个典型的Kafka集群中包含若干Producer、若干Broker、若干Consumer以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。图4-4Kafka总体架构4.4Kafka相关概念图4-5描述了Kafka中的Topic与其他组件的关系。Producer在发布消息时,会发布到特定的Topic,Consumer是从特定的Topic获取消息。每个Topic包含一个或多个Partition。图4-5Kafka中的Topic4.5Kafka的安装和使用4.5.1安装Kafka4.5.2使用Kafka4.5.1安装KafkaKafka的运行需要Java环境的支持,因此,需要在Windows系统中安装JDK。请参照第2章内容完成JDK的安装。访问Kafka官网(http:/kafka.apache.org/downloads),下载Kafka2.4.0版本的安装文件kafka_2.12-2.4.0.tgz,解压缩到“C:”下。因为Kafka的运行需要依赖于Zookeeper,因此,需要下载并安装Zookeeper。当然,Kafka也内置了Zookeeper服务,因此,也可以不用额外安装Zookeeper,而是直接使用内置的Zookeeper服务。为了简单起见,这里直接使用Kafka内置的Zookeeper服务。4.5.2使用Kafka在Windows系统中打开第1个cmd窗口,启动Zookeeper服务:cdc:kafka_2.12-2.4.0.binwindowszookeeper-server-start.bat.configzookeeper.Properties注意,执行上面命令以后,cmd窗口会返回一堆信息,然后就停住不动了,没有回到命令提示符状态,这时,不要误以为死机了,而是Zookeeper服务器已经启动,正在处于服务状态。所以,不要关闭这个cmd窗口,一旦关闭,Zookeeper服务就停止了。4.5.2使用Kafka打开第2个cmd窗口,然后输入下面命令启动Kafka服务:cdc:kafka_2.12-2.4.0.binwindowskafka-server-start.bat.configserver.properties执行上面命令以后,如果启动失败,并且出现提示信息“此时不应有QuickTimeQTSystemQTJava.zip”,则需要把CLASSPATH环境变量的相关信息删除,具体方法是:右键点击“我的电脑”-“高级系统设置”-“环境变量”,然后,找到CLASSPATH环境变量,把类似如下的信息删除:C:ProgramFiles(x86)QuickTimeQTSystemQTJava.zip然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面方法启动Zookeeper和Kafka。执行上面命令以后,如果启动成功,cmd窗口会返回一堆信息,然后就会停住不动,没有回到命令提示符状态,这时,同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。所以,不要关闭这个cmd窗口,一旦关闭,Kafka服务就停止了。4.5.2使用Kafka为了测试Kafka,这里创建一个主题(Topic),名称为“topic_test”,包含一个分区,只有一个副本,在第3个cmd窗口中执行如下命令:cdc:kafka_2.12-2.4.0.binwindowskafka-topics.bat-create-zookeeperlocalhost:2181-replication-factor1-partitions1-topictopic_test可以继续执行如下命令,查看topic_test是否创建成功:.binwindowskafka-topics.bat-list-zookeeperlocalhost:2181如果创建成功,就可以在执行结果中看到topic_test。4.5.2使用Kafka继续在第3个cmd窗口中执行如下命令创建一个生产者来产生消息:.binwindowskafka-console-producer.bat-broker-listlocalhost:9092-topictopic_test该命令执行以后,屏幕上的光标会一直在闪烁,这时,就可以用键盘输入一些内容,比如输入:IloveKafkaKafkaisgood新建第4个cmd窗口,执行如下命令来消费消息:cdc:kafka_2.12-2.4.0.binwindowskafka-console-consumer.bat-bootstrap-serverlocalhost:9092-topictopic_test-from-beginning该命令执行以后,就会在屏幕上看到刚才输入的语句“IloveKafka”和“Kafkaisgood”。4.6使用Python操作Kafka使用Python操作Kafka之前,需要安装第三方模块python-kafka,命令如下:pipins
展开阅读全文
温馨提示:
金锄头文库所有资源均是用户自行上传分享,仅供网友学习交流,未经上传用户书面授权,请勿作他用。
相关搜索

当前位置:首页 > 高等教育 > 大学课件


电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号