RocketMQ 源码分析-Broker
RocketMQ 源码分析-Broker
Broker 作为消息队列的核心,它的职责很多,并且与消息生产者、消息消费者、Namesrv、持久化、ACL等均有关系。本篇只简单介绍什么是 Broker,让读者对 Broker 有个大概认识即可,
Broker 概述
Broker 是 RocketMQ 的核心,它负责消息的传递、消息路由信息的生成、持久化、ACL等等。
Broker 的 pom.xml
可以看到作为消息队列的核心模块,Broker 在依赖中引入了相当多的模块,这里只需要知道每个模块大致功能的即可,记不住就分别写上注释。
注
懒得一个个点开看,可以直接打印依赖树去看。
通过阅读 pom.xml
我们可以知道 Broker 也是基于 Netty 的(rocketmq-remoting),可以 知道它包含持久化功能 rocketmq-store
,它还包含 ACL 功能 rocketmq-acl
等等,这就 不详细罗列了。
启动类
Java 项目找启动类最偷懒的办法就是搜索 main
方法,不过这里我们看文件名就能轻松定位到 BrokerStartup
。
启动流程
BrokerStartup::main
--> BrokerStartup::createBrokerController
--> BrokerStartup::start
--> controller::start
也就是进入 main
方法,然后创建 BrokerController (本质是个 netty server),同时按需初始化一堆东西,最后执行 controller.start()
。
接下来看 Broker 启动时具体做了什么:
// BrokerController::start
public void start() throws Exception {
// 启动持久化
// 这部分在持久化篇展开
if (this.messageStore != null) {
this.messageStore.start();
}
// 启动 netty server
// 持久化涉及到从文件系统恢复消息和消息写入文件系统等,所以要先启动持久化,后启动 netty server
if (this.remotingServer != null) {
this.remotingServer.start();
}
// 启动 netty server
// 与 remotingServer 二选一
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
// 启动一个守护线程 监控文件列表
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// 启动一个 netty client,用于向外部发送请求
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 启动一个守护线程
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
// 启动一个单线程定时任务
// 同时 clientHousekeepingService 是一个跟 brokerController 绑定的 ChannelEventListener
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
// 同样是启动一个单线程定时任务
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
// 非 DLedger CommitLog
// 也就是高可用模式
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 元数据同步
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
// 单线程定时任务,Broker 的信息注册到 NameSrv
// 就是用前面的 brokerOuterAPI (netty client) 给 NameSrv list 发注册请求,默认10秒一次。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// ignore
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
// 单线程定时任务
// 快速失败模式下,用于清理无效请求
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
所以实际上 Broker 的启动就是读取配置,组装好各个组件,然后启动 Netty Server 提供服务。 同时启动一系列定时任务,其他组件,还有一个 netty client 用来与其他服务通信。
注册到 NameSrv List
这里顺便看一下 broker 向 NameSrv 注册路由信息。
BrokerController::doRegisterBrokerAll
源码略过,可以看到注册流程是:首先组装
topicConfigWrapper
和 Broker 的基本信息(BrokerInfo),然后使用brokerOuterAPI::registerBrokerAll
向 namesrv 发注册请求。由于 NameSrv 可以是集群,
brokerOuterAPI
使用了线程池配合CountDownLatch
进行批量注册,返回所有注册结果registerBrokerResultList
。BrokerInfo
装入 requestHeader,Topic 信息装入 requestBody,请求类型为RequestCode.REGISTER_BROKER
,通过同步调用invokeSync
的方式发起请求,拿到返回结果后解码并本地缓存。如果开启了 Broker 的高可用模式,则取出返回结果列表中第一条数据进行高可用模式的后续处理。这里逻辑实际上是反过来的,无论是否开启高可用模式,只要返回列表不为空就取出第一条结果,然后判断是否为高可用模式。
首先会判断是否为高可用模式,并且要获取到
HaServerAddr
,满足条件则把HaServerAddr
更新为 master。之后则是通用处理,其他 slave Broker 把HaServerAddr
设置为自己的 master broker 的地址。最后则是进行“是否为顺序消息模式”的检查和更新。
至此 NameSrv 中关于 Broker 如何注册的坑就填完了。同时也明确了, Broker 在启动时会把自己的 brokerInfo 和 Topic 信息同时注册到所有 NameSrv 上,并且默认每 10 秒会重新注册一次。由于高可用的相关逻辑也跟注册耦合在一起,因此 master broker 的设置和更新也是随注册触发。
核心功能:接收消息
前面发送消息篇 里面我们了解了消息是如何发给 Broker 的,这里我们就来看看 Broker 是如何处理消息的。
注
涉及持久化的部分略过。
定位代码
对于 Broker 而言,接收客户端发来的消息,自然属于数据输入,再结合大家都基于 netty 进行通信, 那么回顾发送消息的底层代码,我们可以轻松发现这里的数据输入基本就是三种
RequestCode
,SEND_MESSAGE
,SEND_REPLY_MESSAGE_V2 | SEND_REPLY_MESSAGE
(视为一种),SEND_BATCH_MESSAGE
。 也就是单向发送, 需要返回值的发送(包括同步、异步),批量发送。broker 接收到 request 后,首先他要对
request
进行解码并提取出RequestCode
,这样才能分辨是什么请求。因此我们去找 Broker 解码的部分。对于 netty 应用而言,解码器在 netty 语境就是一种 Handler,之前 NameSrv 中有提到,RocketMQ 习惯把 handler 都放在 processor 包里,所以可以去 processor 包里找。当然不是所有的 handler 都值得单独创建一个类,你也可以直接利用 IDE 的搜索功能直接在rocketmq/broker
中搜索对应关键字,比如SEND_MESSAGE
,简单粗暴。 再比如,RocketMQ 的命名方式, BrokerController,在传统 web 项目里,Controller 里会定义 一堆RequestMapping
,实际上 netty 应用中也可以通过自定义 handler 来实现类似的功能,所以 也可以在 Contoller 里找。到这一步就随你喜好了,办法很多,以上几个例子只是抛砖引玉。
SendMessageProcessor
启动流程如果有认真看源码的话,在 createBrokerController
那一步中包含 controller.initialize();
这么一个初始化方法,这个初始化方法中有一个步骤就是注册 processor, this.registerProcessor();
。
而这个 registerProcessor()
实际上就是把 RequestCode
和 handler/processor 以及 Executor(线程池)建立对应关系,有点儿类似 web 中的 RequestMapping
。
这里的 handler/processor 使用的是 processor
包下的 SendMessageProcessor
。
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
从这一段可以看出,无论 Broker 接收到的是生产者的请求还是消费者的请求,都是 SendMessageProcessor
来处理。
然后来看 SendMessageProcessor::processRequest
方法,它并没有对 request
进行分析, 而是直接传入 asyncProcessRequest()
。
asyncProcessRequest()
方法中,只对 RequestCode.CONSUMER_SEND_MSG_BACK
一种请求做了区分, 其他的请求直接进入 default 分支,default 分支中也只区分了是否为批量发送,初步印证了之前"发送消息" 中的三种发送消息的方式对于 Broker 来说其实没啥区别。
这里我们看非批量发送的处理逻辑 asyncSendMessage()
,批量消息就是数据解码的时候批量解码然后遍历处理。
asyncSendMessage()
方法比较长,里面混杂了一部分内部消息对象的组装、一部分 request 的解码、 还有 response 的组装等等。
主要看方法最后那段即可:
// asyncSendMessage()
// transFlag 是事务消息的标记 (带这个属性就是事务消息 PROPERTY_TRANSACTION_PREPARED)
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
//broker 未开启事务处理,直接拒绝
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事务消息的处理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 普通消息的处理
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
// 最终返回
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
可以看到 Broker 会调用 messageStore::asyncPutMessage()
对消息进行持久化,也就是常说的刷盘。 在asyncPutMessage()
中,实际上真正去存放消息的是 commitLog.asyncPutMessage(msg);
, 而 commitLog.asyncPutMessage(msg);
的主要工作首先是将收到的消息写入 mappedFile
, 也就是 mappedFile.appendMessage(msg, ...);
,之后刷盘 submitFlushRequest(...);
并返回。
commitLog 这里的源码如果看着迷茫可以先跳过,等讲完持久化章节就很容易理解了,这里面涉及到了 mmap 和 RocketMQ 的文件系统设计,只看代码会很蒙,尤其是这项目木有注释。
我们接着往下看最后的返回, handlePutMessageResultFuture(...)
,方法就是给客户端返回响应的方法,主要工作就是等前面的异步调用都完成之后对 response 进行最后的封装,最终 writeAndFlush
。
总结
可以看到,Broker 本质就是一个 Netty Server,一个处理各种请求、返回响应的服务端。它提供/支持的其他功能,比如持久化、高可用等等,都是单独的模块,后面单独去讲。