`
herman_liu76
  • 浏览: 96658 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

rocketmq源码分析、整体结构与类关系设计的思考

阅读更多
前言:

    本文目的:

       
  • 一个复杂的中间件是如何从整体上分模块以及设计核心类之间关系
  •    
  • 一些重要场景的设计分析与设计模式使用
  •    
  • 有哪些抽取的公共模块,公共底层的东西及如何重用


一、简介:

    RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。
    产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
    本文通过研究这样一个复杂中间件的代码,复盘一个复杂系统的整体设计思路与技巧。网上看单独分析其中部分技术的文章不少,但如果自己从头设计,怎么拆分模块,怎么选择方案?因为时间仓促且能力有限,如有不正确的欢迎指正。
    本文基于4.5.1版本!

二、特点:

2.1 真正分布式系统
    一般的服务都经历这样的过程:
    单机->主从架构->高可用主从架构(自动选主)->分片并备份的分布式集群
    rocketmq是真正的分布式消息队列中间件。

2.2 速度快
    说是参考了kafka,比如顺序写消息到文件,在写消息同时另外构建轻量的队列与消息索引。而有的系统因为对数据要进行复杂处理,所以要单独写blog之类的文件,这里就不需要了。

2.3 局部主从结构
    与kafka不同,这里的broker不是完全对等的,分为多个主broker,每个broker带多个从broker,形成一个小团体,只同步对应的主broker的数据。与redis-cluster类似,主从小组合,组合内部自己切换,降低了复杂性。但elasticsearch的分片又与kafka相似。

2.4 简化的命名服务器集群
    分布式系统一定要有一个配置命名服务,这里没有用zookeeper,也没用其它类似的现成产品,而是相对简化的命名服务。


三、主要结构

3.1 系统结构



1) Name Server
    Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2) Broker
    Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

    每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

3) Producer
    Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

    Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

    Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4) Consumer
   Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

3.2 源码结构

   源码中的maven模块按如下划分。其中client包含了consumer和producer,而broker中的存贮功能由于相对复杂又独立成模块,remoting是基于netty的公共通讯模块,整体上还是很清晰的。

rocketmq-broker:服务端,接受消息,存储消息,consumer拉取消息

rocketmq-client:消息发送和接收,包含consumer和producer

rocketmq-common:通用的枚举、基类方法、或者数据结构,包名有admin、consumer、filter、hook、message

rocketmq-distribution:脚本、配置模块,RocketMQ编译时,bin目录,benchmark目录,conf目录都是从这个模块编译输出的

rocketmq-example:示例模块

rocketmq-filter:消息过滤器

rocketmq-logappender:日志

rocketmq-logging:日志

rocketmq-namesrv:NameServer,类似服务注册中心,broker在这里注册,consumer和producer在这里找到broker地址

rocketmq-openmessaging:RocketMQ支持openmessaging,详见:http://openmessaging.cloud

rocketmq-remoting:使用netty的客户端、服务端,使用fastjson序列化,自定义二进制协议

rocketmq-srvutil:只有一个ServerUtil类,只提供Server程序依赖,尽可能减少客户端依赖

rocketmq-store:消息存储,索引,consumerLog,commitLog等

rocketmq-tools:命令行工具


四、类关系与设计分析

    从这个总图正式开局,后面开始正式分析。



    核心业务模块三个,初看内部的核心类很有层次感,使用通用的底层通讯模块,下面重点介绍client与broker的类关系图与思考。

4.1. client模块



此图分析:
    1)从MQClientManager到MQClientInstance 再到DefaultMQProducerImpl DefaultMQPullConsumer有比较明确的包含关系。因为业务上可能一个应用要对接多个MQ,对一个MQ可能生产多种消息,也可能消费消息。
    2)MQClientAPIImpl统一处理与外界的远程调用的功能API,把客户端公共数据,发送者,消费者的请求都放在这里,同时这里还负责处理外部对客户端的请求的处理。为何这些API不分成三个类呢?可能不需要太细吧,也许API中有些公共的东西。
    3)为何clientRemotingProcessor由MQClientInstance产生,并传递给MQClientAPIImpl,再传递给remotingClient?remotingClient是另外的底层公共通讯模块,各自己的处理当然是传过来的用的。但为啥不让MQClientAPIImpl产生而要传过来?也许MQClientAPIImpl使用时,使用MQClientInstance中的数据或工具。
常见的上级对象产生下级使用的对象时,会把this传进去,实现上下级相互直接引用。MQClientInstance产生MQClientAPIImpl时不传自己this,但是在产生(new)处理器时有new ClientRemotingProcessor(this),而产生new PullMessageService(this) new RebalanceService(this)也是这样。
    说明引用关系不宜过多过乱,最好有一个引用核心。MQ客户端调用自己的API接口合理,但API接口反调用MQ客户端直觉上没必要,API要的处理器,MQ客户端给它,处理器中会用到客户端。说明API也是会用到MQ客户端,但就是不直接引用。这样符合职责规范。有点像上级安排下级任务,派个钦即可,下级有事不用汇报上级,调用钦差即可,职责明确。
    4)说到引用核心,MQClientInstance是整个MQ客户端的核心。当DefaultMQProducerImpl发消息时,是this.mQClientFactory.getMQClientAPIImpl().sendMessage(…),当生成topic时,是this.mQClientFactory.getMQAdminImpl().createTopic(…)。当DefaultMQPullConsumer拉消息时,是this.mQClientFactory.getMQClientAPIImpl().pullMessage(…)。都是通过mQClientFactory(MQClientInstance)找到API类,而它们都不直接引用API类操作。也是把引用关系按职责梳理清晰。清晰的引用关系说明开发人员头脑清楚,否则改动维护就会是一团乱麻。我看到过类职责不清,想用哪个类功能就直接引用注入,不考虑整体上的规划,不考虑是否通过核心类使用那个类。

设计细节:
    1)异步发送线程池设计。队列要有最大数量限制,不可无界asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000)。new ThreadPoolExecutor的参数要配置好,特别是高并发主业务的线程数与Runtime.getRuntime().availableProcessors()处理器数相关。线程池中线程工厂中要命名new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
    2)remotingClient是通用的底层rpc客户端。既然是通用的,具体业务处理逻辑由上层提供。RemotingClient.registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor)中还可以提供线程池。不提供就用内部的publicExecutor。通用层的任务就是产生一个runnable任务,具体是根据req.code找到处理器来处理。这个runnable对象扔了到传过来的线程池来处理。这样对不同的任务,由上层决定线程池的个性化配置。
    3)Hook设计。doBeforeRpcHooks/ doAfterRpcHooks futurn与callback是常用的,但hook还是用的少。我理解在一个调用过程中,特别是重要的调用,要考虑周全的话,是要可以在正式处理的前后,插入额外的处理过程,以实现最大的灵活性。或者把非主流流程放进去,主线更清晰。这总方式类比如servlet里的filter,spring 中的interceptor,在springContext构建的bean每一个环节,比如bean产生前后,属性设置前后,init与dispose时,都可以插入一些灵活的个性的东西。具体是什么交给外部,自己要实现这个机制。
    具体实现比如:SendMessageHook的实现类SendMessageTraceHookImpl,在发送消息前后,在发送消息上下文中加入了TraceContext。又比如RPCHook的实现类AclClientRPCHook,在doBeforeRequest中会额外加入控制信息。


4.2. broker模块



此图分析:
    1) 在broker模块,BrokerController是其中的核心类,BrokerController引用很多类,从功能上的复杂度上看,DefaultMessageStore又是BrokerController下一级的核心类,commitLog又是DefaultMessageStore下一级的核心类。核心类之间围绕着众多的其它功能类为其服务。对外通讯的nettyClient与nettyServer都由BrokerController所引用的类负责。整体上层次感非常清晰,核心类周围的从类众星捧月一般,但从类之间不相互引用,如果需要都通过核心类引用来操作。
    2) 关于数据存贮与高可用,有一个Dledger实现了所谓的openmessagingAPI,内部是。BrokerController根据配置的其中isEnableDLegerCommitLog用来判断是否使用DLeger,默认false是关闭的。在默认情况下使用CommitLog + HAService。使用Dleger时,因为都交给它处理数据记录与选举leader了,但上层还要对其内部的变化感知并处理,比如上报主从变化 给nanmeServer,所以有DledgerRoleChangeHandler类配置给Dleger,等于是监听者,它最重要的引用就是核心的BrokerController。监听者内部得到信息后,产生一个runnable对象,放在自己线程池中处理。
    3) 为了保证各种类型的消息快速存贮,消息本身使用顺序编号的文件,始终加在文件尾部,超过设定后另建新文件存。而另有一个服务reputMessageService不断循环对消息建消费队列,以及建消息索引。都是产生每一个dispatchRequest,由dispatcherList中的建队列建索引的dispatcher依次处理,这种结构下你也很方便在当中增加自己的dispatcher,如果处理耗时,还可以用线程也来处理。
    4) HAService是一主多从broker之间的高可用,看代码貌似只能实现数据的复制,因为updateHaMasterAddress这个messageStore接口的方法会调用HAService.updateHaMasterAddress,但外部没有调用者,如果有,就应该是由一个实现选主协议的功能调用。这个HAService包含一个AcceptSocketService,HAClient与GroupTransferService,这三个都是前面提到的一个通用的抽象类ServiceThread的实现。每个broker即是serverSocket,管理过来的连接,也是SocketChannel去连接其它broker。
    5) 具体的消息顺序写入commitLog见后一张图。

设计细节:
    1)同样的所有线程池的核心线程与最大线程数都是一样,都是配置的。线程中的队列都是设置好容量的LinkedBlockingQueue,所有的线程池都要命名。
    2)ServiceThread是一个非常常用的的抽象类,实现也很多基础的功能,比如启停,等待等等。值得复用。
    3)Mappedfile记录数据到硬盘,也是值得参考的功能,特别是已经封闭到mappedFileQueue中使用。入盘是非常底层的功能了,越是底层越通用。



4.3. namesvr模块
    (略,有空再学习补充)


4.4 文件存盘与消费队列与索引的建立
  
    这方面的文章很多,也很细,很多byteBuffer操作,主要结构与关系见下图。
注意:

       
  • 最底层对应文件的是MappedFile,对应物理文件与相关的属性信息。
  •    
  • 一个broker上有一个commitLog,包含一个MappedFileQueue,其中又包含了MappedFile组成的一个有queue特点的CopyOnWriteArrayList。
  •    
  • 一个broker上的多个IndexFile其实是一个ArrayList里存放的,一个IndexFile包含一个MappedFile。
  •    
  • 一个broker上的多个ConsumeQueue其实是存放在一个map中(从一个topic中找到它下面的这个map),每个ConsumeQueue包含一个MappedFileQueue。




CommitLog的追加数据分析:
    当从commitLog通过haservice加数据是appendData(long, byte[])比较简单,因为只要和主机一致就行了。
    当主commitLog加消息数据时是putMessage(MessageExtBrokerInner),里面有一句:mappedFile.appendMessage(msg, this.appendMessageCallback),使用了callback。当安排下级办一件事时,过程中还要上级来协调/记录一些事情时,通常派一个内部类给下级,下级使用内部类,而内部类天生就指向了包含它的外部类。当然上级也可以把this传给一个普通类并派下去作为回调,比如DledgerRoleChangeHandler就是这样,不过给外部用的话,又有点监听的意思了,不过用法是相似的。

4.5 HaService分析

   这里选择messagestore的其中一个服务HaService,分析其设计



    1)功能:高可用服务,主要是实现主broker与备broker之间的数据同步的。
    2)使用位置:
Ha是属于defaultMessageStore的,与commitLog平级,但当commitLog在进行putMessages(MessageExtBatch)时,会通过defaultMessageStore使用ha来保证消息除了在主broker上存好,也要在备broker上存好,同步双写。
    3)设计简述
Ha既然属于defaultMessageStore,调用存贮功能的外部模块并不关心。平时的功能是主备broker上的数据同步。如果一个备用broker新上线,就会同步到主broker上的数据,与当前是否在处理新消息无关。
    4)类的设计:
    Ha要通过socket进行数据传输,服务端必然有一个serverSocketChannel接收连接,还有产生的与每一个客户端的socketChannel来进行数据读写,肯定要放在一个列表窗口中。客户端只需要一个连接后产生的socketChannel进行读写就可以了。

    服务端类:
    肯定有serverSocketChannel,而且必然在一个循环中处理客户端的连接,可以包装成一个服务。
    连接后产生的每一个socketChannel,但它只能用来读写数据,然而每一个客户端总有个性化的属性数据,所以每一个socketChannel都要封装起来成一个HAConnection,放在一个List容器中。
    每个HAConnection肯定要记录各自对接的客户端的offset。对其中的socketChannel进行读,读客户端的请求要多少数据。对其中的写肯定是把数据发过去。读写都是不断循环的服务。
    同步双写时,数据要保证也写到的从端,

    客户端类:
    肯定有一个连接产生socketChannel,并用它来进行读写数据。它需要定时向服务端请求数据,告诉自己目前数据到哪了。所以也是一个服务,都是循环进行了。
    最后,把服务端类与客户端类合并在一起,产生HaService,进行优化。

    5)设计感悟:
    参数不要碎片化:getHaSendHeartbeatInterval这个ha的参数,并没有从外部配置给haservice,而用的时候还是通过找到messageStore这个核心类,找到配置类中的值,说明参数不要碎片化传递,或者根本就不要传递,保留一个总引用即可。
    大过程分成小过程:主broker上写消息后,需要同时写到从broker上,很可能我们把这个包装在一个过程中。而实际上,写消息到主broker后,haservice进行不停的同步不用管,只需要一定时间内检查ha写从broker的offset是不是上已经>=这个消息时的offset了,就可以了,这个检查过程循环5次,第次最多等1秒,同步心跳也是5秒。这样一个大的任务,拆分成了小任务,写主并检查同步点的,主从同步的,相互不影响,只通过属性值进行判断。
    小过程之间的关联:这个检查从库offset的请求任务,自带countDownlatch,双写过程发出这个请求任务给haService的服务,并等待这个任务的结果。任务被检查后就触发countDown,这样双写过程就知道结果了,与futurn差不多。

4.6 生产者消费者的管理分析

    对于服务端,持有并维护所有的客户端是非常常用的功能,看看rocketmq是怎么做的:


    图片说明:
    1) brokerController还是核心中的核心,其它的模块都围绕着这个核心类引用着。
    2) 3个浅绿色的就是producer/consumer/filterserver Manager,除了filterserver有一个定时任务外,方法都是被深绿色的模块调用的。
    3) NettyRemotingServer被注入多个处理器processer,还有一个监听器clientHousekeepingService来监听channel的变化。当NettyRemotingServer接收外部的请求后,用相应的处理器处理,比如注册producer/conmsumer等,相应处理器会调用相应的manager的方法进行处理。注意都是通过brokerController调用的,不直接引用。当NettyRemotingServer中的channel本身的channel发生变化时,会通知监听器clientHousekeepingService,后者会让三个mananger发生相应的变化。
    4) consumerManager被监听器通知后,除了自己的动作外,它还会通知监听它的深黄色的consumerIdsChangerListener,而这个监听器又进一步引起另外两个浅黄色的对象发生变化。
    5) 即使是下级别的consumerIdsChangerListener也是被核心类brokerController引用,并在init时设置给别人用的。所以一切都以核心类为桥梁。

    设计思考:
    如果我来设计,三个manager是必须的,必然要管理连接的各种客户端。三个管理很可能每一个自己管理自己的channel的可用性。
    客户端的变化受通讯的消息的影响,也受意外的影响,比如通讯超时。所以两个原因造成管理的对象变化。所以消息处理会调用每个mananger,channel监听会调用manager。作者把这些统一到一个类housekeepingservice中,而我可能会多些类来处理。


4.7 消费者PULL拉消息的处理

    通常对消息的PULL拉取是客户端的主动请求,主动去查看有没有新的消息。
    推消息PUSH是监听回调,如果有消息到来,服务端主动把消息发到客户端。

    PULL一般就直接同步调用,但看到brokerController中有一个PullRequestHoldService,看名字是把请求都放在这里。看到在brokerAllowSuspend && hasSuspendFlag的配置下,如果没有直接找到消息ResponseCode.PULL_NOT_FOUND,这个请求要挂起。
    请求要按topic, queueId组合产生的key存在PullRequestHoldService的ConcurrentHashMap里面。PullRequestHoldService有循环检查这些req的请求offset与实际队列的offset,有消息的时候会通过核心类getPullMessageProcessor().executeRequestWhenWakeup,产生一个runnable对象作为发送查到的消息的任务,给一个线程池来执行。


五、与之前业务的对比
   
    之间做过一个业务,包含服务端与个数不定的客户端,通讯是公司内部开发的基于netty的消息通讯,有点类似,这里说说与基于spring的web应用对接。

    基本结构:



    说明:
    1) 核心类是一个单例,接收请求数据,进行拆分后分发到各个客户端进行处理。
    2) 内部有队列存请求业务,还线程池进行处理,还有监听连接情况的listener。同样也有维护客户端的类。
    3) 业务请求的持久化,最终数据的持久化等部分功能需要外部传入接口的实现类。

    4) 该模块作为Jar包嵌入在一个web容器中,通过http请求接收原始处理请求。除这个业务之外的外部类都由spring进行管理。
    5) 使用了一个component作为spring容器与这个核心类的桥梁,持有核心类同时也通过spring自动持有所有核心类要用的外部类。在component的afterpropetyset中进行核心类的初始化,并把所有外部要的类设置进去,最后启动核心类。
    6) web请求的业务数据,也通过这个component提交给核心类来处理。外部需要了解核心类的数据,也通过component获取。

    当时功能还不是非常多,总体上是通过核心类来处理业务的,但个别地方可能有直接引用造成不规范,看了rockmq的设计,可以参考进一步优化自己的组件。


六、总体感觉

    这是一个以功能为主的中间件,在类的关系上很清晰,没有太多的抽象类与接口,没有各种各样的实现。这个与dubbo有鲜明的对比。
    dubbo作为一个微核心,主功能并不多,不过都是抽象类与接口。整合了各种各样的实现方式,有静态的,有动态的实现,都缓存在extension中。就象一个主板兼容各种各样的CPU,MEM,硬盘,声卡,在已有的产品上做了很多适配器。但如果以常用的配置直接实现dubbo,相对应该是非常简化了。

    工程中的类关系的设计有时候类似于生活中的场景,比如一个公司的组织来处理内外业务,一个工厂厂房内如何加工物品。再想想很多算法,sql处理都与人工处理相似,再比如mapreduce算法与农场分散农民收获水果,并最后分类归并出来很类似。

    最近看到有人每一个action类对应一个validate类,而且还被模仿。说明开发人员没有很好的思考类之间的关系。比如各个销售部门进行合同审核,肯定统一找法务部门。validate肯定很很多被公共使用的东西,应该封装在一起。另外我之前常用医院业务举例,体会业务类,单例,各个科室排队等。现实生活中,由于社会化大生产分工细化,同时又组合成复杂的业务,这如同类的设计,有明确的责任,内聚,单一职责,又共同完成复杂的功能。所以设计项目时进行类比思考,非常有利于设计出良好的结构。

    平常的开发以实现功能为主,所以感觉rocketmq更有学习意义。而dubbo更锻炼抽象能力。
  • 大小: 227.1 KB
  • 大小: 140.1 KB
  • 大小: 218 KB
  • 大小: 181.7 KB
  • 大小: 220.2 KB
  • 大小: 50.2 KB
  • 大小: 284.3 KB
  • 大小: 185.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics