跳至主要內容

RocketMQ 源码分析-Broker

lament-z大约 7 分钟

RocketMQ 源码分析-Broker

Broker 作为消息队列的核心,它的职责很多,并且与消息生产者、消息消费者、Namesrv、持久化、ACL等均有关系。本篇只简单介绍什么是 Broker,让读者对 Broker 有个大概认识即可,

Broker 概述

Broker 是 RocketMQ 的核心,它负责消息的传递、消息路由信息的生成、持久化、ACL等等。

Broker 的 pom.xml

可以看到作为消息队列的核心模块,Broker 在依赖中引入了相当多的模块,这里只需要知道每个模块大致功能的即可,记不住就分别写上注释。

懒得一个个点开看,可以直接打印依赖树去看。

通过阅读 pom.xml 我们可以知道 Broker 也是基于 Netty 的(rocketmq-remoting),可以 知道它包含持久化功能 rocketmq-store,它还包含 ACL 功能 rocketmq-acl 等等,这就 不详细罗列了。

启动类

Java 项目找启动类最偷懒的办法就是搜索 main 方法,不过这里我们看文件名就能轻松定位到 BrokerStartup

启动流程

BrokerStartup::main --> BrokerStartup::createBrokerController --> BrokerStartup::start --> controller::start

也就是进入 main 方法,然后创建 BrokerController (本质是个 netty server),同时按需初始化一堆东西,最后执行 controller.start()

接下来看 Broker 启动时具体做了什么:

// BrokerController::start
public void start() throws Exception {
      // 启动持久化
      // 这部分在持久化篇展开
      if (this.messageStore != null) {
          this.messageStore.start();
      }

      // 启动 netty server
      // 持久化涉及到从文件系统恢复消息和消息写入文件系统等,所以要先启动持久化,后启动 netty server
      if (this.remotingServer != null) {
          this.remotingServer.start();
      }

      // 启动 netty server
      // 与 remotingServer 二选一
      if (this.fastRemotingServer != null) {
          this.fastRemotingServer.start();
      }

      // 启动一个守护线程 监控文件列表
      if (this.fileWatchService != null) {
          this.fileWatchService.start();
      }

      // 启动一个 netty client,用于向外部发送请求
      if (this.brokerOuterAPI != null) {
          this.brokerOuterAPI.start();
      }

      // 启动一个守护线程
      if (this.pullRequestHoldService != null) {
          this.pullRequestHoldService.start();
      }

      // 启动一个单线程定时任务
      // 同时 clientHousekeepingService 是一个跟 brokerController 绑定的 ChannelEventListener
      if (this.clientHousekeepingService != null) {
          this.clientHousekeepingService.start();
      }

      // 同样是启动一个单线程定时任务
      if (this.filterServerManager != null) {
          this.filterServerManager.start();
      }

      // 非 DLedger CommitLog
      // 也就是高可用模式
      if (!messageStoreConfig.isEnableDLegerCommitLog()) {
          startProcessorByHa(messageStoreConfig.getBrokerRole());
          // 元数据同步
          handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
          this.registerBrokerAll(true, false, true);
      }

      // 单线程定时任务,Broker 的信息注册到 NameSrv
      // 就是用前面的 brokerOuterAPI (netty client) 给 NameSrv list 发注册请求,默认10秒一次。
      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

          @Override
          public void run() {
              try {
                  BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
              } catch (Throwable e) {
                  log.error("registerBrokerAll Exception", e);
              }
          }
      }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);


      // ignore
      if (this.brokerStatsManager != null) {
          this.brokerStatsManager.start();
      }

      // 单线程定时任务
      // 快速失败模式下,用于清理无效请求
      if (this.brokerFastFailure != null) {
          this.brokerFastFailure.start();
      }

  }  

所以实际上 Broker 的启动就是读取配置,组装好各个组件,然后启动 Netty Server 提供服务。 同时启动一系列定时任务,其他组件,还有一个 netty client 用来与其他服务通信。

注册到 NameSrv List

这里顺便看一下 broker 向 NameSrv 注册路由信息。

  • BrokerController::doRegisterBrokerAll

    源码略过,可以看到注册流程是:首先组装 topicConfigWrapper 和 Broker 的基本信息(BrokerInfo),然后使用 brokerOuterAPI::registerBrokerAll 向 namesrv 发注册请求。

    由于 NameSrv 可以是集群,brokerOuterAPI 使用了线程池配合 CountDownLatch 进行批量注册,返回所有注册结果registerBrokerResultListBrokerInfo 装入 requestHeader,Topic 信息装入 requestBody,请求类型为 RequestCode.REGISTER_BROKER,通过同步调用invokeSync的方式发起请求,拿到返回结果后解码并本地缓存。

    如果开启了 Broker 的高可用模式,则取出返回结果列表中第一条数据进行高可用模式的后续处理。这里逻辑实际上是反过来的,无论是否开启高可用模式,只要返回列表不为空就取出第一条结果,然后判断是否为高可用模式。

    首先会判断是否为高可用模式,并且要获取到HaServerAddr,满足条件则把 HaServerAddr 更新为 master。之后则是通用处理,其他 slave Broker 把 HaServerAddr 设置为自己的 master broker 的地址。

    最后则是进行“是否为顺序消息模式”的检查和更新。

    至此 NameSrv 中关于 Broker 如何注册的坑就填完了。同时也明确了, Broker 在启动时会把自己的 brokerInfo 和 Topic 信息同时注册到所有 NameSrv 上,并且默认每 10 秒会重新注册一次。由于高可用的相关逻辑也跟注册耦合在一起,因此 master broker 的设置和更新也是随注册触发。

核心功能:接收消息

前面发送消息篇open in new window 里面我们了解了消息是如何发给 Broker 的,这里我们就来看看 Broker 是如何处理消息的。

涉及持久化的部分略过。

  • 定位代码

    对于 Broker 而言,接收客户端发来的消息,自然属于数据输入,再结合大家都基于 netty 进行通信, 那么回顾发送消息的底层代码,我们可以轻松发现这里的数据输入基本就是三种 RequestCodeSEND_MESSAGESEND_REPLY_MESSAGE_V2 | SEND_REPLY_MESSAGE (视为一种), SEND_BATCH_MESSAGE。 也就是单向发送, 需要返回值的发送(包括同步、异步),批量发送。

    broker 接收到 request 后,首先他要对 request 进行解码并提取出 RequestCode,这样才能分辨是什么请求。因此我们去找 Broker 解码的部分。对于 netty 应用而言,解码器在 netty 语境就是一种 Handler,之前 NameSrv 中有提到,RocketMQ 习惯把 handler 都放在 processor 包里,所以可以去 processor 包里找。当然不是所有的 handler 都值得单独创建一个类,你也可以直接利用 IDE 的搜索功能直接在 rocketmq/broker 中搜索对应关键字,比如 SEND_MESSAGE ,简单粗暴。 再比如,RocketMQ 的命名方式, BrokerController,在传统 web 项目里,Controller 里会定义 一堆 RequestMapping,实际上 netty 应用中也可以通过自定义 handler 来实现类似的功能,所以 也可以在 Contoller 里找。

    到这一步就随你喜好了,办法很多,以上几个例子只是抛砖引玉。

SendMessageProcessor

启动流程如果有认真看源码的话,在 createBrokerController 那一步中包含 controller.initialize(); 这么一个初始化方法,这个初始化方法中有一个步骤就是注册 processor, this.registerProcessor();

而这个 registerProcessor() 实际上就是把 RequestCode 和 handler/processor 以及 Executor(线程池)建立对应关系,有点儿类似 web 中的 RequestMapping

这里的 handler/processor 使用的是 processor 包下的 SendMessageProcessor

sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

从这一段可以看出,无论 Broker 接收到的是生产者的请求还是消费者的请求,都是 SendMessageProcessor 来处理。

然后来看 SendMessageProcessor::processRequest 方法,它并没有对 request 进行分析, 而是直接传入 asyncProcessRequest()

asyncProcessRequest()方法中,只对 RequestCode.CONSUMER_SEND_MSG_BACK 一种请求做了区分, 其他的请求直接进入 default 分支,default 分支中也只区分了是否为批量发送,初步印证了之前"发送消息" 中的三种发送消息的方式对于 Broker 来说其实没啥区别。

这里我们看非批量发送的处理逻辑 asyncSendMessage(),批量消息就是数据解码的时候批量解码然后遍历处理。

asyncSendMessage() 方法比较长,里面混杂了一部分内部消息对象的组装、一部分 request 的解码、 还有 response 的组装等等。

主要看方法最后那段即可:

// asyncSendMessage()
// transFlag 是事务消息的标记 (带这个属性就是事务消息 PROPERTY_TRANSACTION_PREPARED)
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
    //broker 未开启事务处理,直接拒绝
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                      + "] sending transaction message is forbidden");      
        return CompletableFuture.completedFuture(response);
    }
     // 事务消息的处理
    putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    // 普通消息的处理
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
  // 最终返回
 return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);

可以看到 Broker 会调用 messageStore::asyncPutMessage() 对消息进行持久化,也就是常说的刷盘。 在asyncPutMessage() 中,实际上真正去存放消息的是 commitLog.asyncPutMessage(msg);, 而 commitLog.asyncPutMessage(msg); 的主要工作首先是将收到的消息写入 mappedFile, 也就是 mappedFile.appendMessage(msg, ...);,之后刷盘 submitFlushRequest(...); 并返回。

commitLog 这里的源码如果看着迷茫可以先跳过,等讲完持久化章节就很容易理解了,这里面涉及到了 mmap 和 RocketMQ 的文件系统设计,只看代码会很蒙,尤其是这项目木有注释。

我们接着往下看最后的返回, handlePutMessageResultFuture(...),方法就是给客户端返回响应的方法,主要工作就是等前面的异步调用都完成之后对 response 进行最后的封装,最终 writeAndFlush

总结

可以看到,Broker 本质就是一个 Netty Server,一个处理各种请求、返回响应的服务端。它提供/支持的其他功能,比如持久化、高可用等等,都是单独的模块,后面单独去讲。

上次编辑于:
贡献者: Lament