RocketMQ 源码分析-高可用 HA
RocketMQ 源码分析-高可用
高可用基本是分布式系统必备特性之一,实现方案通常为主从机制。
RocketMQ 的高可用就挺特别,它有主从同步但是没有主从切换,读写分离也并不彻底, 正常情况下所有消费者都是从 master 上拉取消息,只有积压消息超过 master 物理内存的 40%时,消费者切换到 slave 上进行消费,此时其实才是“读写分离”的。当 slave 上的积压消息小于 30% 时,消费者会再次切换回 master 进行消费。
主从同步
RocketMQ 的主从同步实际上就是 Broker 的主从同步,每当消息送达 Broker-Master 时,要同步到 Broker-Slave,这样一旦 Master 挂了,Consumer 可以从 Slave 拉取消息。
接下来看代码,源码位置:rocketmq/store.ha 的 org.apache.rocketmq.store.ha
包。
机制概述
Broker 主从机制的基本流程如下:
Step1:Master 启动并监听端口
Broker-Master 启动时会监听 MessageStoreConfig.haListenPort 端口,等待 Slave 的连接。
Step2:Slave 启动并主动连接 Master
就是 Slave 主动与上一步的 Broker:haListenPort 建立链接。
Step3:同步消息
链接建立后,Slave 向 Master 发送待拉取消息的 offset,Master 根据请求返回消息给 Slave。
Step4:保存 & 继续同步
Slave 保存收到的消息后继续重复以上流程。
接下来我们就根据这个流程来看 HA 的代码。
Step1
首先是 Broker-Master 启动并监听端口,这一步的入口实际上就是在Broker-启动流程中没有详细展开的 BrokerStartup::createBrokerController
方法里。
这里我们来看一下它:
BrokerStartup::createBrokerController
感觉 Broker 的下篇都没啥可写的了,都分散到其他章节里去了,回头重新调整一下 Broker 篇的内容。
下面是设置监听端口(haListenPort):
// BrokerStartup::createBrokerController
// 只看设置监听端口部分 其余均省略
// listenPort 默认值虽然是 8888,但这里直接设置成了 10911
nettyServerConfig.setListenPort(10911);
// haListenPort 的逻辑值是 listenPort + 1
// 默认值是 10912,实际值算出来也是 10911 + 1 = 10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
// 根据角色设置 master 的 BrokerID
// BrokerID 应该自行在配置文件中设置好,master 为 0,slave > 0,多个 slave 要区分开
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER: // Master 的 BrokerID 为 0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
// ...
}
然后在 BrokerController::start
中找到 this.messageStore.start();
,HA 属于存储模块,在 DefaultMessageStore::start()
中找到 haService.start();
。
4.x 版本
注意 分布式 CommitLog,或者说 DLedger 与高可用模式不兼容
// from DefaultMessageStore::start()
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
然后来看主角 HAService
。
- HAService::start
public void start() throws Exception {
// 监听 haListenPort 端口, NIO
this.acceptSocketService.beginAccept();
// 下面三个都是服务线程, ServiceThread
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
至此,第一步的实现已经看完了,Master 在创建 BrokerController 时会设置 haListenPort 的值,存储模块启动时会启动 haService,haService 启动时会监听 haListenPort 等待 Slave 的连接。
Step2
Slave 启动并主动连接 Master。
Broker-Slave 的 BrokerID 是用户自己在配置文件里指定的,必须大于0,而且同一个 Master 如果你配置多个 slave 的话,他们的 brokerID 不能相同。
启动流程就省略了,只看它连接 Master,这部分代码就在 this.haClient.start();
。
HAClient
是服务线程,直接去看它的 run
方法,可以找到 this.connectMaster()
。
- HAClient::connectMaster
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) { // sc 为空,则尝试与 master 建立连接
String addr = this.masterAddress.get();
if (addr != null) {
// 转换类型,从 String 到 InetSocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接到 master
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注册 OP_READ 事件,不认识的话,你就把它理解为网络IO事件的读事件
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
提示
注意 master 和 slave 建立 tls 链接这部分都是 java nio,不是 netty 语境。
Step3 & Step4
同步消息,保存,重复以上流程。
回到 HAClient::run
:
// HAClient::run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) { //step2
// 只要 channel 还在就会进入这里
if (this.isTimeToReportOffset()) { // 距离上次 ReportOffset 超过 5s 就为 true
// Step3 中的上报 offset
// 向 master 上报 slave 端当前最大偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster(); // 关闭 channel
}
}
this.selector.select(1000);
// step3 中的 master 返回消息,以及 step 4 中的保存数据
// 这里的 ReadEvent 就是 IO 概念里的读事件
// 也就是 slave 读取 master 根据 slaveMaxOffset 返回的待同步消息
// 具体点说就是: 先从 channel 中读数据,一次最多读 4MB,读到数据后调用
// dispatchReadRequest 方法,这个方法会解码读到的数据,并 append 到 slave 的 commitLog 文件中。
// 保存消息的流程,详见持久化篇 defaultMessageStore.appendToCommitLog -> .commitLog.appendData -> mappedFile.appendMessage
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 更新 haService.currentReportedOffset
// 并再次上报 offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
// 写入间隔超过 20s,则认为链接无效,关闭 channel。
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
// slave 未能和 master 建立链接,则 5s 后重试
// 利用自定义的 CountDownLatch 实现的,叫 CountDownLatch2,有兴趣可以自己去看
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
高可用流程的实现就基本看完啦,无论是之前的 netty 还是这里的 Java NIO 部分都不在讲解范围内,因此都略过啦,有兴趣可以自己去看。
然后来看一下 HA 核心类。
HAService
HAService 就是高可用的核心实现类,它有三个内部类:AcceptSocketService
、GroupTransferService
、HAClient
。它内部还维护一个 HAConnection
列表(private final List<HAConnection> connectionList = new LinkedList<>();
)。
我们挨个来看,首先是 AcceptSocketService
:
AcceptSocketService
AcceptSocketService
是个服务线程,还是先看 run 方法,它的主要工作就是监听 haListenPort 端口上的 IO 事件/状态,只要满足条件就建立链接,并且创建一个HAConnection
对象,并存入this.connectionList
。而前面提到的另一个方法
AcceptSocketService::beginAccept
,就是为监听端口的准备工作,因此要在AcceptSocketService
启动前执行。说白了,
AcceptSocketService
就是 master 用来监听 slave 连接的实现类。GroupTransferService
GroupTransferService
的 run 方法逻辑为每 10ms 执行一次 doWaitTransfer
方法。
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// tansferOK 就是主从同步是否完成的标记。
// req.getNextOffset() 是从 master 传回来的下一条新消息的 offset
// push2SlaveMaxOffset 是 salve 目前已同步的最大 offset
// 当 slave 上同步的 offset >= master 上新消息的起始位置,说明 salve 已经追平 master 的进度,甚至超出
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
// 未追平时,该循环运行间隔为 1s,SyncFlushTimeout 为 5s,所以 requestsRead 中的每个 req 通常会循环5次
// 一旦追平 or 超时都会退出循环
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
// 这里是给 producer 的返回,追平说明主从刷盘均成功;如果前面哪个循环中没有追平,则返回 Slave刷盘超时的状态
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
// 将 读列表 至空
this.requestsRead = new LinkedList<>();
}
}
由上述代码我们可以得出一个结论,GroupTransferService
的主要工作就是监视 slave 的同步状态,以便为发送消息的 producer 返回响应。
然后来看一下 GroupTransferService
的其他内容,首先是它内部创建了俩队列,requestsWrite
和 requestsRead
。这个读写队列跟前几篇讲的读写分离技巧是一样的,两个队列交换的时机是: run -> waitForRunning(10) -> this.onWaitEnd();
。
还有一个方法的逻辑要看,那就是 notifyTransferSome
,不过 GroupTransferService::notifyTransferSome
只是唤醒逻辑,完整处理逻辑在主类的同名方法里:
// HAService::notifyTransferSome
public void notifyTransferSome(final long offset) {
// 这里的 offset 参数可以直接看作 新的 push2SlaveMaxOffset 的值
// 或者对于 value 来说就是 newValue
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 更新 push2SlaveMaxOffset
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
// 唤醒 GroupTransferService
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
这个方法的调用时机时机是 master 收到 slave 的拉取请求后,细节后面会讲。
- HAClient
HAClient
就要复杂一些,这个类 master/slave 都会用到。它主要负责处理各种业务逻辑。 比如前面我们过流程的时候提到的连接 master 就是它的 connectMaster
方法。
这阅读部分代码需要注意一点,以 connectMaster
方法为例,很明显这个方法就是给 slave 用的,但实际上 master 也会进入这个方法。
HAClient
的核心方法在前面分析流程的时候已经解释的差不多了,只不过因为大部分都是 IO 代码,没有单独贴出来。
这里简单说说其他没出现过的方法。
HAClient::updateMasterAddress
,这个顾名思义,更新 Master 地址,发生在 slave 的 masterAddress 为空,或者收到新地址时。
HAClient::reallocateByteBuffer
和 HAClient::swapByteBuffer
两个方法合起来实现了 byteBufferRead
的读写状态切换功能,这俩方法就相当于是手动实现了一遍 this.byteBufferRead.flip();
。 这个也是 IO 的内容,给看不懂的小伙伴简单解释下,在这里数据都是加密的数据流的形式,你可以把他想成一个长度未知的一维数组,里面存储的都是加密(encode)后的数据,数据此时就像水管里的水流,(读事件)读数据的时候要么是读到没数据为止,要么是读固定长度,这个过程你就可以想成一个指向数组下标的指针在不断移动,读到哪就指到哪;而当你要切换到写状态时,你肯定是要把数据从头开始写,也就是下标 0 的位置,但是你指针现在在读的位置。byteBuffer::flip()
就是帮你把下标置换回 0,详细说明可以看它的注释,解释的很好,还给了用例。由于本文跳过了关于 netty 和 IO 部分的代码,实际上前面几篇中 netty 部分都是用 flip()
的。对 IO 不熟悉的小伙伴强烈建议跟着 netty 文档动手写一遍文档里所有的 demo,这样什么 handle、encode/decode、粘包处理等等常见问题都会有清晰的认识 。
HAConnection
这个类就是用来管理连接和网络 IO 读写的类,他有俩内部类,ReadSocketService
和 WriteSocketService
,这俩 Service 也是服务线程。
HAConnection
的构造方法其实就是创建一个“链接:connection”,并同时为该“链接”创建读写两个线程。 其实就是把 slave 与 master 进行网络通讯的 socketChannel 封装成了一个链接对象。
然后来看它的俩属性,slaveRequestOffset
属性就是 slave 要请求的 offset,默认值 -1,代表拉取还未开始,请求行为会直接放弃,0 代表按照 slave 当前的 maxOffset 拉取,其余情况就是按它的真实值拉取。
slaveAckOffset
,slave 向 master 反馈已同步的偏移量。这个值就是 HAService::notifyTransferSome
方法的入参。
了解以上他们的含义后,分析读写线程的业务逻辑就比较简单了,二者实际上非常相似,基本流程都是先 encode 数据,然后调用 ReadSocketService::processReadEvent / WriteSocketService::transferData
完成网络传输(包括粘包处理等),传输完成后进行 decode,如果有其他逻辑则执行。最终释放各种资源,并销毁线程。
主从切换
主从切换/读写分离的大致逻辑就不再重复了,这里直接来看这部分逻辑是如何实现的。
首先来看,Consumer 如何找到具体的 Broker 的,前面在消费篇里提过,Consumer 根据 MessageQueue
去确定 Broker。这里我们更进一步,去看之前没继续展开的调用链。
通过消费篇我们知道,负责从 broker 拉取消息的是 DefaultMQPushConsumerImpl::pullMessage
方法,该方法最终是通过调用 this.pullAPIWrapper.pullKernelImpl(...)
来完成消息拉取的,而这个方法里第一步就是根据 MessageQueue
找 Broker:
// mq 参数的类型就是 MessageQueue
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);
我们要找到俩方法都在这里了,一个是 findBrokerAddressInSubscribe
,一个是 recalculatePullFromWhichNode
。
findBrokerAddressInSubscribe
方法的功能就是根据 BrokerName 和 BrokerID 找到对应的 BrokerAddress,然后把相关信息塞进 FindBrokerResult
对象中并返回。
BrokerName 的值来自 mq.getBrokerName
,这个没啥可说的,主要看 brokerID,也就是 this.recalculatePullFromWhichNode(mq)
。
public long recalculatePullFromWhichNode(final MessageQueue mq) {
// 这个不用看,这是用户指定
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
// 主从是否要切换就取决于 suggest
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
// 默认只访问 master
return MixAll.MASTER_ID;
}
从代码中我们可以看出,Consumer 是否切换 broker 就取决于 suggest 的值,(pullFromWhichNodeTable 中以 MessageQueue 为 key,suggest 为 value)。那么 suggest 的值是哪来的,在 processPullResult
中 this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
这一步。
//updatePullFromWhichNode
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) { // 为空则创建,相当于初始化
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
// 非空则 update BrokerID
suggest.set(brokerId);
}
}
也就是说 suggest 是由 Broker 反馈给 Consumer 的。那么我们去 Broker 看 suggest 如何生成。
前文提到了,大于 40% 时由主切从,小于 30% 由从切主,先来看这个值的初始化:
// BrokerStartup.class
// 物理内存最大占比, 40 就是 40%,Slave 是 30%
// master 40% 是切换 slave 的阈值
// slave 30% 是切回 master 的阈值
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
然后来看:
// DefaultMessageStore::getMessage
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 这里决定是否主从切换
getResult.setSuggestPullingFromSlave(diff > memory);
之后在 PullMessageProcessor::processRequest
中设置 consumer 拿到的 brokerID,如下:
// DefaultMessageStore::getMessage 就在这段代码前调用
// 根据 suggest 设置 brokerID
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
对 PullMessageProcessor::processRequest
做个简单说明,PullMessageProcessor
可以简单看作负责处理请求的类,processRequest
就是处理请求的方法。
元数据同步
注
本章节目录结构来自《RocketMQ 技术内幕》。
这里所说的元数据指的是,Broker 中存储的 topic、消费组、消费进度等信息。
前两个大章节介绍了 HA 的主从同步和主从切换,除了以上功能外,master 宕机时,虽然无法写入消息,但是消费短期内是不受影响的,consumer 依然可以从 slave 上进行消费。master 恢复后,slave 可以把“元数据”,比如集群模式的消费进度,主动同步回 master。
Slave 自动同步元数据
这个同步是单向的,只会是 slave 向 master 同步数据,这就是 broker 篇启动流程中的 BrokerController::handleSlaveSynchronize
方法。
实现方式就是通过定时任务,每 10s 与 master 同步一次数据。
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 同步
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
具体同步哪些数据可以可根据代码自行查看:
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
消费进度同步
由于消费进度变化很快,只靠 10s 一次的同步显然有点慢,因此还有另外一种方式,就是通过 consumer 发来的 pullMessage 请求,如果其中包含的消费进度的信息,就直接拿来更新本地消费进度。
这部分代码也在 PullMessageProcessor::processRequest
,感兴趣自行查看。
总结
高可用也写完啦,撒花 😃
关于 RocketMQ 的文章至此也就暂时告一段落啦,后续会重新调整一下 Broker 篇的内容,以及纠错等等。
消息轨迹、ACL 啥的就不更啦。
未来关于 RocketMQ 的文章大概就是找个合适的时间把 5.0 的内容放上来。