跳至主要內容

RocketMQ 源码分析-高可用 HA

lament-z大约 13 分钟

RocketMQ 源码分析-高可用

高可用基本是分布式系统必备特性之一,实现方案通常为主从机制。

RocketMQ 的高可用就挺特别,它有主从同步但是没有主从切换,读写分离也并不彻底, 正常情况下所有消费者都是从 master 上拉取消息,只有积压消息超过 master 物理内存的 40%时,消费者切换到 slave 上进行消费,此时其实才是“读写分离”的。当 slave 上的积压消息小于 30% 时,消费者会再次切换回 master 进行消费。

主从同步

RocketMQ 的主从同步实际上就是 Broker 的主从同步,每当消息送达 Broker-Master 时,要同步到 Broker-Slave,这样一旦 Master 挂了,Consumer 可以从 Slave 拉取消息。

接下来看代码,源码位置:rocketmq/store.ha 的 org.apache.rocketmq.store.ha 包。

机制概述

Broker 主从机制的基本流程如下:

  1. Step1:Master 启动并监听端口

    Broker-Master 启动时会监听 MessageStoreConfig.haListenPort 端口,等待 Slave 的连接。

  2. Step2:Slave 启动并主动连接 Master

    就是 Slave 主动与上一步的 Broker:haListenPort 建立链接。

  3. Step3:同步消息

    链接建立后,Slave 向 Master 发送待拉取消息的 offset,Master 根据请求返回消息给 Slave。

  4. Step4:保存 & 继续同步

    Slave 保存收到的消息后继续重复以上流程。

接下来我们就根据这个流程来看 HA 的代码。

Step1

首先是 Broker-Master 启动并监听端口,这一步的入口实际上就是在Broker-启动流程open in new window中没有详细展开的 BrokerStartup::createBrokerController 方法里。

这里我们来看一下它:

  • BrokerStartup::createBrokerController

    感觉 Broker 的下篇都没啥可写的了,都分散到其他章节里去了,回头重新调整一下 Broker 篇的内容。

    下面是设置监听端口(haListenPort):

// BrokerStartup::createBrokerController
// 只看设置监听端口部分 其余均省略
// listenPort 默认值虽然是 8888,但这里直接设置成了 10911
nettyServerConfig.setListenPort(10911);

// haListenPort 的逻辑值是 listenPort + 1
// 默认值是 10912,实际值算出来也是 10911 + 1 = 10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);

// 根据角色设置 master 的 BrokerID
// BrokerID 应该自行在配置文件中设置好,master 为 0,slave > 0,多个 slave 要区分开
switch (messageStoreConfig.getBrokerRole()) {
    case ASYNC_MASTER:
    case SYNC_MASTER: // Master 的 BrokerID 为 0
        brokerConfig.setBrokerId(MixAll.MASTER_ID);
        break;
    // ...
}                          

然后在 BrokerController::start 中找到 this.messageStore.start();,HA 属于存储模块,在 DefaultMessageStore::start() 中找到 haService.start();

4.x 版本

注意 分布式 CommitLog,或者说 DLedger 与高可用模式不兼容

// from DefaultMessageStore::start()
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    this.haService.start();
    this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}

然后来看主角 HAService

  • HAService::start
public void start() throws Exception {
    // 监听 haListenPort 端口, NIO   
    this.acceptSocketService.beginAccept();
    // 下面三个都是服务线程, ServiceThread  
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}

至此,第一步的实现已经看完了,Master 在创建 BrokerController 时会设置 haListenPort 的值,存储模块启动时会启动 haService,haService 启动时会监听 haListenPort 等待 Slave 的连接。

Step2

Slave 启动并主动连接 Master。

Broker-Slave 的 BrokerID 是用户自己在配置文件里指定的,必须大于0,而且同一个 Master 如果你配置多个 slave 的话,他们的 brokerID 不能相同。

启动流程就省略了,只看它连接 Master,这部分代码就在 this.haClient.start();

HAClient 是服务线程,直接去看它的 run 方法,可以找到 this.connectMaster()

  • HAClient::connectMaster
private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) { // sc 为空,则尝试与 master 建立连接
        String addr = this.masterAddress.get();
        if (addr != null) {
            // 转换类型,从 String 到 InetSocketAddress
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 连接到 master
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                  // 注册 OP_READ 事件,不认识的话,你就把它理解为网络IO事件的读事件
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

提示

注意 master 和 slave 建立 tls 链接这部分都是 java nio,不是 netty 语境。

Step3 & Step4

同步消息,保存,重复以上流程。

回到 HAClient::run

// HAClient::run
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {   
            if (this.connectMaster()) { //step2
                // 只要 channel 还在就会进入这里
                if (this.isTimeToReportOffset()) { // 距离上次 ReportOffset 超过 5s 就为 true

                    // Step3 中的上报 offset
                    // 向 master 上报 slave 端当前最大偏移量
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster(); // 关闭 channel
                    }
                }

                this.selector.select(1000);

                // step3 中的 master 返回消息,以及 step 4 中的保存数据
                // 这里的 ReadEvent 就是 IO 概念里的读事件
                // 也就是 slave 读取 master 根据 slaveMaxOffset 返回的待同步消息
                // 具体点说就是: 先从 channel 中读数据,一次最多读 4MB,读到数据后调用
                // dispatchReadRequest 方法,这个方法会解码读到的数据,并 append 到 slave 的 commitLog 文件中。
                // 保存消息的流程,详见持久化篇 defaultMessageStore.appendToCommitLog -> .commitLog.appendData -> mappedFile.appendMessage
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                // 更新 haService.currentReportedOffset
                // 并再次上报 offset
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                        - this.lastWriteTimestamp;
                // 写入间隔超过 20s,则认为链接无效,关闭 channel。        
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                // slave 未能和 master 建立链接,则 5s 后重试
                // 利用自定义的 CountDownLatch 实现的,叫 CountDownLatch2,有兴趣可以自己去看
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }
    log.info(this.getServiceName() + " service end");
}

高可用流程的实现就基本看完啦,无论是之前的 netty 还是这里的 Java NIO 部分都不在讲解范围内,因此都略过啦,有兴趣可以自己去看。

然后来看一下 HA 核心类。

HAService

HAService 就是高可用的核心实现类,它有三个内部类:AcceptSocketServiceGroupTransferServiceHAClient。它内部还维护一个 HAConnection 列表(private final List<HAConnection> connectionList = new LinkedList<>();)。

我们挨个来看,首先是 AcceptSocketService:

  • AcceptSocketService

    AcceptSocketService 是个服务线程,还是先看 run 方法,它的主要工作就是监听 haListenPort 端口上的 IO 事件/状态,只要满足条件就建立链接,并且创建一个 HAConnection 对象,并存入 this.connectionList

    而前面提到的另一个方法 AcceptSocketService::beginAccept,就是为监听端口的准备工作,因此要在 AcceptSocketService 启动前执行。

    说白了,AcceptSocketService 就是 master 用来监听 slave 连接的实现类。

  • GroupTransferService

GroupTransferService 的 run 方法逻辑为每 10ms 执行一次 doWaitTransfer 方法。

private void doWaitTransfer() {
    if (!this.requestsRead.isEmpty()) {
        for (CommitLog.GroupCommitRequest req : this.requestsRead) {

            // tansferOK 就是主从同步是否完成的标记。
            // req.getNextOffset()  是从 master 传回来的下一条新消息的 offset
            // push2SlaveMaxOffset 是 salve 目前已同步的最大 offset
            // 当 slave 上同步的 offset >= master 上新消息的起始位置,说明 salve 已经追平 master 的进度,甚至超出
            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
            long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                    + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();

            // 未追平时,该循环运行间隔为 1s,SyncFlushTimeout 为 5s,所以 requestsRead 中的每个 req 通常会循环5次
            // 一旦追平 or 超时都会退出循环
            while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                this.notifyTransferObject.waitForRunning(1000);
                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
            }

            if (!transferOK) {
                log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
            }
            // 这里是给 producer 的返回,追平说明主从刷盘均成功;如果前面哪个循环中没有追平,则返回 Slave刷盘超时的状态
            req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
        }
        // 将 读列表 至空
        this.requestsRead = new LinkedList<>();
    }
}

由上述代码我们可以得出一个结论,GroupTransferService 的主要工作就是监视 slave 的同步状态,以便为发送消息的 producer 返回响应。

然后来看一下 GroupTransferService 的其他内容,首先是它内部创建了俩队列,requestsWriterequestsRead。这个读写队列跟前几篇讲的读写分离技巧是一样的,两个队列交换的时机是: run -> waitForRunning(10) -> this.onWaitEnd();

还有一个方法的逻辑要看,那就是 notifyTransferSome,不过 GroupTransferService::notifyTransferSome 只是唤醒逻辑,完整处理逻辑在主类的同名方法里:

// HAService::notifyTransferSome
public void notifyTransferSome(final long offset) {
    // 这里的 offset 参数可以直接看作 新的 push2SlaveMaxOffset 的值
    // 或者对于 value 来说就是 newValue
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        // 更新 push2SlaveMaxOffset
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            // 唤醒 GroupTransferService
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            value = this.push2SlaveMaxOffset.get();
        }
    }
}

这个方法的调用时机时机是 master 收到 slave 的拉取请求后,细节后面会讲。

  • HAClient

HAClient 就要复杂一些,这个类 master/slave 都会用到。它主要负责处理各种业务逻辑。 比如前面我们过流程的时候提到的连接 master 就是它的 connectMaster 方法。

这阅读部分代码需要注意一点,以 connectMaster 方法为例,很明显这个方法就是给 slave 用的,但实际上 master 也会进入这个方法。

HAClient 的核心方法在前面分析流程的时候已经解释的差不多了,只不过因为大部分都是 IO 代码,没有单独贴出来。

这里简单说说其他没出现过的方法。

HAClient::updateMasterAddress,这个顾名思义,更新 Master 地址,发生在 slave 的 masterAddress 为空,或者收到新地址时。

HAClient::reallocateByteBufferHAClient::swapByteBuffer 两个方法合起来实现了 byteBufferRead 的读写状态切换功能,这俩方法就相当于是手动实现了一遍 this.byteBufferRead.flip(); 。 这个也是 IO 的内容,给看不懂的小伙伴简单解释下,在这里数据都是加密的数据流的形式,你可以把他想成一个长度未知的一维数组,里面存储的都是加密(encode)后的数据,数据此时就像水管里的水流,(读事件)读数据的时候要么是读到没数据为止,要么是读固定长度,这个过程你就可以想成一个指向数组下标的指针在不断移动,读到哪就指到哪;而当你要切换到写状态时,你肯定是要把数据从头开始写,也就是下标 0 的位置,但是你指针现在在读的位置。byteBuffer::flip() 就是帮你把下标置换回 0,详细说明可以看它的注释,解释的很好,还给了用例。由于本文跳过了关于 netty 和 IO 部分的代码,实际上前面几篇中 netty 部分都是用 flip() 的。对 IO 不熟悉的小伙伴强烈建议跟着 netty 文档动手写一遍文档里所有的 demo,这样什么 handle、encode/decode、粘包处理等等常见问题都会有清晰的认识 。

HAConnection

这个类就是用来管理连接和网络 IO 读写的类,他有俩内部类,ReadSocketServiceWriteSocketService,这俩 Service 也是服务线程。

HAConnection 的构造方法其实就是创建一个“链接:connection”,并同时为该“链接”创建读写两个线程。 其实就是把 slave 与 master 进行网络通讯的 socketChannel 封装成了一个链接对象。

然后来看它的俩属性,slaveRequestOffset 属性就是 slave 要请求的 offset,默认值 -1,代表拉取还未开始,请求行为会直接放弃,0 代表按照 slave 当前的 maxOffset 拉取,其余情况就是按它的真实值拉取。

slaveAckOffset,slave 向 master 反馈已同步的偏移量。这个值就是 HAService::notifyTransferSome 方法的入参。

了解以上他们的含义后,分析读写线程的业务逻辑就比较简单了,二者实际上非常相似,基本流程都是先 encode 数据,然后调用 ReadSocketService::processReadEvent / WriteSocketService::transferData 完成网络传输(包括粘包处理等),传输完成后进行 decode,如果有其他逻辑则执行。最终释放各种资源,并销毁线程。

主从切换

主从切换/读写分离的大致逻辑就不再重复了,这里直接来看这部分逻辑是如何实现的。

首先来看,Consumer 如何找到具体的 Broker 的,前面在消费篇里提过,Consumer 根据 MessageQueue 去确定 Broker。这里我们更进一步,去看之前没继续展开的调用链。

通过消费篇我们知道,负责从 broker 拉取消息的是 DefaultMQPushConsumerImpl::pullMessage 方法,该方法最终是通过调用 this.pullAPIWrapper.pullKernelImpl(...) 来完成消息拉取的,而这个方法里第一步就是根据 MessageQueue 找 Broker:

// mq 参数的类型就是 MessageQueue
FindBrokerResult findBrokerResult =
         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);

我们要找到俩方法都在这里了,一个是 findBrokerAddressInSubscribe,一个是 recalculatePullFromWhichNode

findBrokerAddressInSubscribe 方法的功能就是根据 BrokerName 和 BrokerID 找到对应的 BrokerAddress,然后把相关信息塞进 FindBrokerResult 对象中并返回。

BrokerName 的值来自 mq.getBrokerName,这个没啥可说的,主要看 brokerID,也就是 this.recalculatePullFromWhichNode(mq)

public long recalculatePullFromWhichNode(final MessageQueue mq) {
     // 这个不用看,这是用户指定
     if (this.isConnectBrokerByUser()) {
         return this.defaultBrokerId;
     }

     // 主从是否要切换就取决于 suggest
     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
     if (suggest != null) {
         return suggest.get();
     }

     // 默认只访问 master  
     return MixAll.MASTER_ID;
 }

从代码中我们可以看出,Consumer 是否切换 broker 就取决于 suggest 的值,(pullFromWhichNodeTable 中以 MessageQueue 为 key,suggest 为 value)。那么 suggest 的值是哪来的,在 processPullResultthis.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); 这一步。

//updatePullFromWhichNode
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
     if (null == suggest) { // 为空则创建,相当于初始化
         this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
     } else {
         // 非空则 update BrokerID
         suggest.set(brokerId);
     }
 }

也就是说 suggest 是由 Broker 反馈给 Consumer 的。那么我们去 Broker 看 suggest 如何生成。

前文提到了,大于 40% 时由主切从,小于 30% 由从切主,先来看这个值的初始化:

// BrokerStartup.class
// 物理内存最大占比, 40 就是 40%,Slave 是 30%
// master 40% 是切换 slave 的阈值
// slave 30% 是切回 master 的阈值
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}

然后来看:

// DefaultMessageStore::getMessage  
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
   * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 这里决定是否主从切换
getResult.setSuggestPullingFromSlave(diff > memory);

之后在 PullMessageProcessor::processRequest 中设置 consumer 拿到的 brokerID,如下:

// DefaultMessageStore::getMessage 就在这段代码前调用
// 根据 suggest 设置 brokerID
if (getMessageResult.isSuggestPullingFromSlave()) {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

PullMessageProcessor::processRequest 做个简单说明,PullMessageProcessor 可以简单看作负责处理请求的类,processRequest 就是处理请求的方法。

元数据同步

本章节目录结构来自《RocketMQ 技术内幕》。

这里所说的元数据指的是,Broker 中存储的 topic、消费组、消费进度等信息。

前两个大章节介绍了 HA 的主从同步和主从切换,除了以上功能外,master 宕机时,虽然无法写入消息,但是消费短期内是不受影响的,consumer 依然可以从 slave 上进行消费。master 恢复后,slave 可以把“元数据”,比如集群模式的消费进度,主动同步回 master。

Slave 自动同步元数据

这个同步是单向的,只会是 slave 向 master 同步数据,这就是 broker 篇启动流程中的 BrokerController::handleSlaveSynchronize 方法。

实现方式就是通过定时任务,每 10s 与 master 同步一次数据。

private void handleSlaveSynchronize(BrokerRole role) {
      if (role == BrokerRole.SLAVE) {
          if (null != slaveSyncFuture) {
              slaveSyncFuture.cancel(false);
          }
          this.slaveSynchronize.setMasterAddr(null);
          slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
              @Override
              public void run() {
                  try {
                      // 同步
                      BrokerController.this.slaveSynchronize.syncAll();
                  }
                  catch (Throwable e) {
                      log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                  }
              }
          }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
      } else {
          //handle the slave synchronise
          if (null != slaveSyncFuture) {
              slaveSyncFuture.cancel(false);
          }
          this.slaveSynchronize.setMasterAddr(null);
      }
  }

具体同步哪些数据可以可根据代码自行查看:

public void syncAll() {
    this.syncTopicConfig();
    this.syncConsumerOffset();
    this.syncDelayOffset();
    this.syncSubscriptionGroupConfig();
}

消费进度同步

由于消费进度变化很快,只靠 10s 一次的同步显然有点慢,因此还有另外一种方式,就是通过 consumer 发来的 pullMessage 请求,如果其中包含的消费进度的信息,就直接拿来更新本地消费进度。

这部分代码也在 PullMessageProcessor::processRequest,感兴趣自行查看。

总结

高可用也写完啦,撒花 😃

关于 RocketMQ 的文章至此也就暂时告一段落啦,后续会重新调整一下 Broker 篇的内容,以及纠错等等。

消息轨迹、ACL 啥的就不更啦。

未来关于 RocketMQ 的文章大概就是找个合适的时间把 5.0 的内容放上来。

上次编辑于:
贡献者: Lament