分享一个c#写的开源分布式消息队列equeue

上传人:枫** 文档编号:503991970 上传时间:2023-11-13 格式:DOCX 页数:10 大小:67.03KB
返回 下载 相关 举报
分享一个c#写的开源分布式消息队列equeue_第1页
第1页 / 共10页
分享一个c#写的开源分布式消息队列equeue_第2页
第2页 / 共10页
分享一个c#写的开源分布式消息队列equeue_第3页
第3页 / 共10页
分享一个c#写的开源分布式消息队列equeue_第4页
第4页 / 共10页
分享一个c#写的开源分布式消息队列equeue_第5页
第5页 / 共10页
点击查看更多>>
资源描述

《分享一个c#写的开源分布式消息队列equeue》由会员分享,可在线阅读,更多相关《分享一个c#写的开源分布式消息队列equeue(10页珍藏版)》请在金锄头文库上搜索。

1、前言本文想介绍一下前段时间在写enode时,顺便实现的一个分布式消息队列equeue。这个消息 队列的思想不是我想出来的,而是通过学习阿里的rocketmq后,自己用c#实现了一个轻量级 的简单版本。一方面可以通过写这个队列让自己更深入的掌握消息队列的一些常见问题;另一方 面也可以用来和en ode集成,为enode中的comma nd和domai n eve nt的消息传递提供支 持。目前在.net平台,比较好用的消息队列,最常见的是微软的MSMQ 了吧,还有像rabbitmq 也有.net的client端。这些消息队列都很强大和成熟。但当我学习了 kafka以及阿里的 rocketmq (

2、早期版本叫 metaq,自 metaq 3.0 后改名为 rocketmq)后,觉得 rocketmq 的设计思想深深吸引了我,因为我不仅能理解其思想,还有其完整的源代码可以学习。但是 rocketmq是java写的,且目前还没有.net的client端,所以不能直接使用(有兴趣的朋友 可以为其写一个.net的client端),所以在学习了 rocketmq的设计文档以及大部分代码后, 决定自己用c#写一个出来。项目中包含了队列的全部源代码以及如何使用的示例。也可以在enode项目中看到如何使用。EQUEUE 消 息 队 列中的 专业术语Topic一个topic就是一个主题。一个系统中,我们可

3、以对消息划分为一些topic,这样我们就能通过 topic,将消息发送到不同的queue。Queue一个topic下,我们可以设置多个queue,每个queue就是我们平时所说的消息队列;因为 queue是完全从属于某个特定的topic的,所以当我们要发送消息时,总是要指定该消息所属 的topic是什么。然后equeue就能知道该topic下有几个queue 了。但是到底发送到哪个 queue呢?比如一个topic下有4个queue,那对于这个topic下的消息,发送时,到底该发 送到哪个queue呢?那必定有个消息被路由的过程。目前equeue的做法是在发送一个消息时, 需要用户指定这个消息

4、对应的topic以及一个用来路由的一个object类型的参数。equeue会 根据topic得到所有的queue,然后根据该object参数通过hash code然后取模queue的个 数最后得到要发送的queue的编号,从而知道该发送到哪个queue。这个路由消息的过程是在 发送消息的这一方做的,也就是下面要说的producer。之所以不在消息服务器上做是因为这样 可以让用户自己决定该如何路由消息,具有更大的灵活性。Producer就是消息队列的生产者。我们知道,消息队列的本质就是实现了 publish-subscribe 的模式, 即生产者-消费者模式。生产者生产消息,消费者消费消息。所以

5、这里的Producer就是用来生 产和发送消息的。Consumer就是消息队列的消费者,一个消息可以有多个消费者。Consumer Group消费者分组,这可能对大家来说是一个新概念。之所以要搞出一个消费者分组,是为了实现下面 要说的集群消费。一个消费者分组中包含了一些消费者,如果这些消费者是要集群消费,那这些 消费者会平均消费该分组中的消息。Brokerequeue 中的 broker 负责消息的中转,即接收 producer 发送过来的消息,然后持久化消息到 磁盘,然后接收 consumer 发送过来的拉取消息的请求,然后根据请求拉取相应的消息给consumer。所以,broker可以理解

6、为消息队列服务器,www.ipb.cc提供消息的接收、存储、 拉取服务。可见,broker对于equeue来说是核心,它绝对不能挂,一旦挂了,那producer, consumer 就无法实现 publish-subscribe 了。集群消费集群消费是指,一个consumer group下的consumer,平均消费topic下的queue。具体如何平均可以看一下下面的架构图,这里先用文字简单描述一下。假如一个topic下有4个 queue,然后当前有一个consumer group,该分组下有4个consumer,那每个consumer 就被分配到该topic下的一个queue,这样就达到了

7、平均消费topic下的queue的目的。如果 consumer group下只有两个consumer,那每个consumer就消费2个queue。如果有3 个consumer,则第一个消费2个queue,后面两个每个消费一个queue,从而达到尽量平均 消费。所以,可以看出,我们应该尽量让consumer group下的consumer的数目和topic 的 queue 的数目一致或成倍数关系。这样每个 consumer 消费的 queue 的数量总是一样的, 这样每个 consumer 服务器的压力才会差不多。当前前提是这个 topic 下的每个 queue 里的消 息的数量总是差不多多的。

8、这点我们可以对消息根据某个用户自己定义的key来进行hash路 由来保证。广播消费广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的所有 queue里的消息,而不管这个consumer的group是什么。所以对于广播消费来说,consumer group 没什么实际意义。 consumer 可以在实例化时,我们可以指定是集群消费还是广播消费。消费进度(offset)消费进度是指,当一个 consumer group 里的 consumer 在消费某个 queue 里的消息时, equeue是通过记录消费位置(offset)来知道当前消费到哪里了。以便该

9、consumer重启后继 续从该位置开始消费。比如一个topic有4个queue,一个 con sumer group有4个con sumer, 则每个consumer分配到一个queue,然后每个consumer分别消费自己的queue里的消息。 equeue会分另U记录每个consumer对其queue的消费进度,从而保证每个consumer重启 后知道下次从哪里开始继续消费。实际上,也许下次重启后不是由该consumer消费该queue 了,而是由group里的其他consumer消费了,这样也没关系,因为我们已经记录了这个queue 的消费位置了。所以可以看出,消费位置和consume

10、r其实无关,消费位置完全是queue的一 个属性,用来记录当前被消费到哪里了。另外一点很重要的是,一个topic可以被多个consumer group里的consumer订阅。不同consumer group里的consumer即便是消费同一个topic 下的同一个queue,那消费进度也是分开存储的。也就是说,不同的consumer group内的 consumer 的消费完全隔离,彼此不受影响。还有一点就是,对于集群消费和广播消费,消费 进度持久化的地方是不同的,集群消费的消费进度是放在broker,也就是消息队列服务器上的, 而广播消费的消费进度是存储在consumer本地磁盘上的。之所

11、以这样设计是因为,对于集群 消费,由于一个queue的消费者可能会更换,因为consumer group下的consumer数量可 能会增加或减少,然后就会重新计算每个con sumer该消费的queue是哪些,这个能理解的把? 所以,当出现一个 queue 的 consumer 变动的时候,新的 consumer 如何知道该从哪里开始 消费这个 queue 呢?如果这个 queue 的消费进度是存储在前一个 consumer 服务器上的,那 就很难拿到这个消费进度了,因为有可能那个服务器已经挂了,或者下架 了,都有可能。而因为 broker 对于所有的 consumer 总是在服务的,所以,

12、在集群消费的情 况下,被订阅的 topic 的 queue 的消费位置是存储在 broker 上的,存储的时候按照不同的 consumer group做隔离,以确保不同的consumer group下的consumer的消费进度互补 影响。然后,对于广播消费,由于不会出现一个queue的consumer会变动的情况,所以我们 没必要让broker来保存消费位置,所以是保存在consumer自己的服务器上。EQUEUE 是 什 么 ?OOOProducerO1TOPIC_AConsumerConsumerAConsumerProducer、TOPIC_B 通过上图,我们能直观的理解equeue。

13、这个图是从rocketmq的设计文档中拿来的,呵呵。 由于equeue的设计思想完全和rocketmq 一致,所以我就拿过来用了。每个producer可以 向某个topic发消息,发送的时候根据某种路由策略(producer可自定义)将消息发送到某个 特定的queue。然后consumer可以消费特定topic下的queue里的消息。上图中,TOPIC_A 有两个消费者,这两个消费者是在一个group里,所以应该平均消费TOPIC_A下的queue但 由于有三个queue,所以第一个consumer分到了 2个queue,第二个consumer分到了 1 个。对于TOPIC_B,由于只有一个消

14、费者,那TOPIC_B下的所有queue都由它消费。所有 的topic信息、queue信息、还有消息本身,都存储在broker服务器上。这点上图中没有体现 出来。上图主要关注 producer,consumer,topic,queue 这四个东西之间的关系,并不关注物 理服务器的部署架构。关键问题的思考1.producer,broker,consumer三 者 之 间如 何 通 信由于是用c#实现,且因为一般是在局域网内部署,为了实现高性能通信,我们可以利用异步 socket来通信。.net本身提供了很好的异步socket通信的支持;我们也可以用zeromq来实 现高性能的 socket 通信

15、。本来想直接使用 zeromq 来实现通信模块就好了,但后来自己学习了 一下.net自带的socket通信相关知识,发现也不难,所以就自己实现了一个,呵呵。自己实现 的好处是我可以自己定义消息的协议,目前这部分实现代码在 ecommon 基础类库中,是一个 独立的可服用的与业务场景无关的基础类库。有兴趣的可以去下载下来看看代码。经过了自己的 一些性能测试,发现通信模块的性能还是不错的。一台broker,四台producer同时向这个 broker发送消息,每秒能发送的消息4W没有问题,更多的producer还没测试。2.消息如何持久化消息持久化方面主要考虑的是性能问题,还有就是消息如何快速的读

16、取。1. 首先,一台broker上的消息不需要一直保存在该broker服务器上,因为这些消息总会被 消费掉。根据阿里rocketmq的设计,默认会1天删除一次已经被消费过的消息。所以,我们 可以理解,broker上的消息应该不会无限制增长,因为会被定期删除。所以不必考虑一台broker 上消息放不下的问题。2. 如何快速的持久化消息?一般来说,我觉得有两种方式:1)顺序写磁盘文件; 2)用现成的 key,value的nosql产品来存储;rocketmq目前用的是自己写文件的方式,这种方式的难点 是写文件比较复杂,因为所有消息都是顺序append到文件末尾,虽然性能非常高,但复杂度 也很高;比如所有消息不能全写在一个文件里,一个文件到达一定大小后需要拆分,一旦拆分就 会产生很多问题,呵呵。拆分后如何读取也是比较复杂的问题。还有由于是顺序写入文件的,那 我们还需要把每

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 学术论文 > 其它学术论文

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号