跳至主要內容

RocketMQ 源码分析-NameSrv

lament-z大约 14 分钟

RocketMQ 源码分析-NameServer

NameSrv 比较简单,这是由它的设计决定的。 本文会以自顶向下的方式来介绍 NameSrv 的角色和功能。

概述

RocketMQ 是一个分布式的消息队列,分布式系统自然需要一个模块来负责“路由管理”、“服务注册与发现”。 NameSrv (NameServer) 就是负责这部分的模块。

前文(发送消息open in new window) 中提到了 Client 发送消息时会去 Namesrv 拉数据(updateTopicRouteInfoFromNameServer), 确认发给哪个 Broker 的哪个 Queue。这部分就类似路由功能。

Broker 启动时会把自身注册到 NameSrv,如果是 NameSrv Cluster,那就注册到所有到 NameSrv 上。NameSrv 与 Broker 之间保持长连接,10秒检测一次 Broker 是否存活,当检测的 Broker 挂掉时则从自身的路由注册表中删除该 Broker。这部分就类似“服务注册发现”。

NameSrv 与常见的“服务注册发现”不太一样的地方是,它的各个节点之间并不会互相通信,就比如无论是 Eureka 还是 nacos ,他们是会相互同步信息最终数据一致的,但是 NameSrv 并不会,这是 RocketMQ 的特点之一,虽然可能会导致短暂的发送到各个 Broker 的消息不均匀,但这是可以接受的。

Namesrv 的基本组成介绍

Namesrv 本身是个基于 netty 的服务端。

NamesrvController 负责初始化 netty 服务器,以及启动各种定时任务。

processor 包里是各种事件的 Handler(netty 语境)。

BrokerHousekeepingService 这个说是 Service ,本质是个 ChannelEventListener,只不过他监听的 Channel 状态大部分与清理无效 Broker 相关。

RouteInfoManager 主要就是维护以下五个数据表:

 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

可以看到,Broker / BrokerCluster / Topic 等信息都由 RouteInfoManager 进行管理。

前文中提到由于 NameSrv 集群中,各个 NameSrv 之间并不会相互通信,因此会有短暂的路由信息不一致,从而导致消费发送存在少量的不均匀。

注意

NameSrv 的路由注册表发生变化时并不会通知消息生产者(后简称P),如果实际场景有要求,P 端需要在发送消息时提供容错来保证消息发送成功。

源码分析

对 NameSrv 有了大体认识之后我们来挨个看细节。

启动流程

实际就是启动一个 NettyServer,大致流程为: NamesrvStartup.class 是启动类 -> main() -> main0() -> 创建 NamesrvController 的对象,启动该对象.

main0() 里会先创建一个 NamesrvController 的对象(其实就是自定义的 NettyServer)。 这个过程中会处理命令行参数/配置文件啥的,就把各种需要的参数/配置都准备好。

  // main0
  NamesrvController controller = createNamesrvController(args);

创建好 controller 之后接下来就启动它,也就是 start(controller);

这里会先执行 controller.initialize() 进行初始化工作,加载准备好的配置信息啦、创建 NettyServer 对象啦、创建线程池、启动定时任务等等,全部成功之后 controller.start(); 才真正启动服务器。

检测心跳

每 10s 扫描一次 brokerLiveTable,如果发现过期的 broker,则删除。这个是controller.initialize()初始化时启动的定时任务之一。

brokerLiveTable 是前面说的 RouteInfoManager 管理数据表之一,数据结构为 HashMap ,Key 是 broker 的地址,Value 是 Broker 对应的信息。

HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

每 10s 扫描一次 brokerLiveTable 的工作就是这里处理的,它单独起个线程 10 秒扫描一次(下方源码)。

// scheduledExecutorService
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
      "NSScheduledThread"));

// 为了方便阅读 我这里改成了 lambda 表达式
this.scheduledExecutorService.scheduleAtFixedRate(() -> NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);

broker 默认是 30s 向 NameSrv 发一次心跳包。
NameSrv 10 秒扫一次它维护的 brokerLiveTable。

扫描时对 Broker 进行判定,判断 broker 是否存活的逻辑:上次收到心跳包的时间 + 120s < 系统当前时间。 翻译一下其实就是:2分钟没收到过 broker 发来的心跳包就认为这个 broker 挂了,代码如下:

  // last 来自 BrokerLiveInfo.lastUpdateTimestamp,aka,上次收到心跳包的时间
  // BROKER_CHANNEL_EXPIRED_TIME = 120s,也就是 broker_Channel 过期时间
  (last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()

最后来看看他是如何删除 broker 信息的,RouteInfoManager::scanNotActiveBroker

  // RouteInfoManager::scanNotActiveBroker
  Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

  while (it.hasNext()) {
      Entry<String, BrokerLiveInfo> next = it.next();
      long last = next.getValue().getLastUpdateTimestamp();
      if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
        // 先尝试主动关闭 channel,内部逻辑就是 netty 的 closeChannel,添加了一个 listener 监控,成功关闭时打印 log
        // 注意这里的 Channel 是 netty 定义的(io.netty.channel.Channel),不是 Java NIO 那个
          RemotingUtil.closeChannel(next.getValue().getChannel());
        // 从 brokerLiveTable 中删除过期 broker 信息
          it.remove();
          log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
        // 从方法命名看应该是 Channel 关闭后触发的方法
        // 正常使用方式应该是通过 BrokerHousekeepingService 定义的监控去调用,
        // 类似前面给 close 方法加个 Listener,监控关闭成功,然后打印关闭日志。
        // 然鹅这里不知道为何就这么直接调用了,不过这条流程本来就写的很成问题,不必深究
        // 这个方法是从 RouteInfoManager 维护的五个数据表里删除过期 broker 的对应信息。
          this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
      }
  }

RouteInfoManager##onChannelDestroy方法主要功能就是前面注释里说的从 RouteInfoManager 维护的五个数据表里删除过期 broker 的对应信息,这个方法正常的调用方式是通过 BrokerHousekeepingService 这个 ChannelEventListener 注册到 channel 的不同状态上去调用,比如:

public void onChannelClose(String remoteAddr, Channel channel) {
      this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
  }

而不应该像 RouteInfoManager::scanNotActiveBroker 里面那样直接调用,源码里也就这么一处是直接调用。 然后你自然会发现,这一处不规范导致多了一堆迷一样的判定,代码可读性迅速降低。

  public void onChannelDestroy(String remoteAddr, Channel channel) {
      String brokerAddrFound = null;


      // 遍历 brokerLiveTable,根据 channel 找 brokerAddr
      if (channel != null) {
          try {
              try {
                // 加读锁 这里的 this.lock 就是 JUC 提供的可重入读写锁 ReentrantReadWriteLock
                  this.lock.readLock().lockInterruptibly();
                  // 迭代器
                  Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                      this.brokerLiveTable.entrySet().iterator();
                  // 开始遍历
                  while (itBrokerLiveTable.hasNext()) {
                      Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                      // 根据 channel 找对应的 brokerAddr 信息
                      if (entry.getValue().getChannel() == channel) {
                        // 把找到的 brokerAddr 存到 brokerAddrFound 后面要用
                          brokerAddrFound = entry.getKey();
                          break;
                      }
                  }
              } finally {
                  this.lock.readLock().unlock(); // 取消读锁
              }
          } catch (Exception e) {
              log.error("onChannelDestroy Exception", e);
          }
      }
      // 无法根据 channel 找到 brokerAddr 时,则直接使用 remoteAddr 参数
      // remoteAddr 参数 就是要删除的 brokerAddr

      if (null == brokerAddrFound) {
          brokerAddrFound = remoteAddr;
      } else {
          log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
      }

      // 从五个数据表 中删除 broker 的相关信息。
      if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

          try {
              try {
                  this.lock.writeLock().lockInterruptibly(); // 写锁
                  // 从 brokerLiveTable 中删除
                  this.brokerLiveTable.remove(brokerAddrFound);
                  // 从 filterServerTable 中删除
                  this.filterServerTable.remove(brokerAddrFound);

                  String brokerNameFound = null;
                  boolean removeBrokerName = false;
                  Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                      this.brokerAddrTable.entrySet().iterator();

                  while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                      // brokerData 里存的是 broker 主从集群的信息
                      BrokerData brokerData = itBrokerAddrTable.next().getValue();
                      // 主从集群的迭代器,Long 对应 brokerID, String 对应 brokerAddr
                      Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();

                      // 从主从集群里删除无效 broker
                      while (it.hasNext()) {
                          Entry<Long, String> entry = it.next();
                          Long brokerId = entry.getKey();
                          String brokerAddr = entry.getValue();
                          if (brokerAddr.equals(brokerAddrFound)) {
                              brokerNameFound = brokerData.getBrokerName();
                              it.remove();
                              log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                  brokerId, brokerAddr);
                              break;
                          }
                      }

                      if (brokerData.getBrokerAddrs().isEmpty()) {
                        // broker 主从集群为空的话,设置 removeBrokerName 为 true,后面还要根据这个标记来判断是否要删除对应的 Topic
                          removeBrokerName = true;
                          itBrokerAddrTable.remove();
                          log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                              brokerData.getBrokerName());
                      }
                  }

                  // 跟前面区别不大,这里是从 clusterAddrTable 中删除 cluster 以及这个 cluster 包含的 broker
                  if (brokerNameFound != null && removeBrokerName) {
                      Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                      while (it.hasNext()) {
                          Entry<String, Set<String>> entry = it.next();
                          String clusterName = entry.getKey();
                          Set<String> brokerNames = entry.getValue();
                          boolean removed = brokerNames.remove(brokerNameFound);
                          if (removed) {
                              log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                  brokerNameFound, clusterName);

                              if (brokerNames.isEmpty()) {
                                  log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                      clusterName);
                                  it.remove();
                              }

                              break;
                          }
                      }
                  }

                  // 从 topicQueueTable 中删除 对应的 topic
                  if (removeBrokerName) {
                      Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                          this.topicQueueTable.entrySet().iterator();
                      while (itTopicQueueTable.hasNext()) {
                          Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                          String topic = entry.getKey();
                          List<QueueData> queueDataList = entry.getValue();

                          Iterator<QueueData> itQueueData = queueDataList.iterator();
                          while (itQueueData.hasNext()) {
                              QueueData queueData = itQueueData.next();
                              if (queueData.getBrokerName().equals(brokerNameFound)) {
                                  itQueueData.remove();
                                  log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                      topic, queueData);
                              }
                          }

                          if (queueDataList.isEmpty()) {
                              itTopicQueueTable.remove();
                              log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                  topic);
                          }
                      }
                  }
              } finally {
                  this.lock.writeLock().unlock();
              }
          } catch (Exception e) {
              log.error("onChannelDestroy Exception", e);
          }
      }
  }

10分钟打印一次配置信息

就是打印 KVConfigManager 管理的配置信息。 略。

RouteInfoManager

Namesrv 承担着类似服务发现/路由的功能,RouteInfoManager 这个类就可以看作是路由功能的实现。

五个数据表介绍

前面说了,它维护了五个数据表:

 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

简单了解一下 RocketMQ 的基本概念,可以帮助理解 RouteInfoManager 维护了些什么东西

RocketMQ 主要基于订阅/发布机制,一个 topic 可以拥有多个 Queue,一个 Broker 默认情况下会为每个 topic 创建 4个 readQueue 和 4个 writeQueue。

多个 brokerName 相同的 broker 可以组成一个主从集群,这个主从集群的集群名存储在 cluster 中。 主从 broker 之间通过 brokerID 进行区分,一个 broker 的 brokerID = 0l,则为主节点;brokerID > 0l 时为从节点。

接下来分别看一下这五个数据表。

topicQueueTable

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

这个数据表里存储的是每个 topic 和自身所对应的 broker + queue 的信息列表。 换句话说这个是 topic 的路由表,发送消息时会根据这个表进行负载均衡。

// QueueData
 private String brokerName;
 private int readQueueNums;
 private int writeQueueNums;
 private int perm; // 读写权限
 private int topicSysFlag; // topic 同步标记
  • brokerAddrTable

    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

    这个里面存的是 BrokerName 和它对应的 broker 主从集群信息。
    如果一个 broker 的 brokerId 为 0L,那它就是主broker。

    // BrokerData
      private String cluster;
      private String brokerName;
      private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    
  • clusterAddrTable

    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

    这个就是集群路由表,clusterName 和属于该集群的所有 broker 的 brokerName 集合一一映射。

  • brokerLiveTable

    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

    brokerAddr 和其对应的存活信息(BrokerLiveInfo)。
    每次 Namesrv 收到 broker 发来的心跳包后都会根据 brokerAddr 来更新对应的 BrokerLiveInfo。

  • filterServerTable
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    这个是 brokerAddr 与 FilterServer列表的对应关系。

注册/更新 Broker 信息

Broker 每隔30秒向 Namesrv 集群发送一次心跳包(启动时也会发送一次),Namesrv 收到心跳包后会去更新对应的 BrokerLiveInfo,具体的说是更新上一次收到该 broker 发来心跳包的时间戳(BrokerLiveInfo.lastUpdateTimestamp)。 同时 Namesrv 会每隔10秒扫描一次整个 brokerLiveTable,如果任意 broker 的 lastUpdateTimestamp 超过120秒没有更新过则从所有数据表中删除该 Broker 的信息,同时关闭 Channel (Netty语境),或者说是关闭 Socket 连接。

  • Broker 发送心跳包

    这部分源码分析放到 Broker 章节,其实这部分如果熟悉 Netty 的话就挺简单的,就是发个请求,不熟悉的话最好先看看 Netty。

  • Namesrv 处理心跳包

    由于 RocketMQ 是基于 Netty 来进行网络通信的,而对于 Netty 来说,所有的“请求”都是 Socket 传入的数据包。因此要区分不同“请求”,通常的做法是发送数据的时候额外携带“请求类型”, 通过 Decoder 解码数据后再判定是什么类型的"请求"。

    客户端发送请求时候将请求类型+请求参数一起通过编码器(Encoder)编码为 byte 数组(或者理解为二进制数据,就像 Redis 那样,这样当 NettyServer 收到数据包时,先通过解码器(Decoder)解码为 Java 对象,然后就可以判断请求类型了。

    这个过程也是通讯协议从设计到实践的过程,有兴趣可以在自己的 netty demo 里面先自定义一个协议,然后手动实践一下。

    这里提到的 Encoder/Decoder 以及后续判断请求类型这些在 Netty 语境都是不同的 handler。

    RocketMQ 的习惯是把 handler 都放在 processor 包下面,所以有了上面的基础知识我们就知道去哪里找 Namesrv 处理心跳包的逻辑了。

    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor就是我们要找的类。

    这个类也简单,可以先大概浏览一遍,就是processRequest()方法里面通过 switch (request.getCode())来判断请求类型,然后根据请求类型转发给后续处理方法。

    org.apache.rocketmq.common.protocol.RequestCode里面定义了所有的请求类型CODE。 我们这里要关注的是 Broker 发来的注册请求,所以你搜 BROKER 或者 REGISTER 就可以了。

    case RequestCode.REGISTER_BROKER:
         Version brokerVersion = MQVersion.value2Version(request.getVersion());
         if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
         } else {
            return this.registerBroker(ctx, request);
         }
    

    可以看到它提供了两个注册方法,一个是带 FilterServer 模式的,一个是普通的注册。 这里只分析this.registerBroker(ctx, request);

    
    // 首先这里的入参不用管,就是 netty 中典型的 handler 方法,看过 netty 自然秒懂
    public RemotingCommand registerBroker(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    
        // 从这里开始跳过 都是 netty 构造数据包的内容
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }
    
        TopicConfigSerializeWrapper topicConfigWrapper;
        if (request.getBody() != null) {
            topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
        } else {
            topicConfigWrapper = new TopicConfigSerializeWrapper();
            topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
            topicConfigWrapper.getDataVersion().setTimestamp(0);
        }
        // 以上都不用看,编码解码校验请求,不懂的先去看 Netty
    
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            topicConfigWrapper,
            null,
            ctx.channel()
        );
        // 这里开始往后也不用看,就是构造 response 的数据包
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());
    
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    

    所以你看DefaultRequestProcessor就是转发了一下注册请求,真正处理注册逻辑的是RouteInfoManager::registerBroker。 下面来看这部分代码:

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
              // RouteInfoManager 中维护的五个数据表都是 HashMap
              // 并且通过前面的代码我们知道会有一个线程定期扫描并剔除无效 broker,五个表都有涉及。
              // 所以修改前一定要加锁防止并发问题
                this.lock.writeLock().lockInterruptibly();
    
              // --------------  处理 clusterAddrTable ---------------------
                // 先判断 发送请求的 Broker 所属集群是否存在,如果不存在就添加集群
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);
    
              // --------------  clusterAddrTable 完成 --------------------
    
    
              // -------------------- 处理 brokerAddrTable --------------------
    
                boolean registerFirst = false;
                // 先确认 brokerName 对应的 brokerData 是否存在,如果不存在则创建。
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                  // 这里标记为首次注册,因为当 brokerAddrTable 里连 BrokerData 都没有,说明这个主从集群里一台 broker 都没有
                    registerFirst = true;  
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
    
                /*
                注册过程中,如果发起注册请求的 broker 是已经存在于 brokerAddrTable 的 slave broker,
                但是请求参数 brokerID 为0的话,会发生主从交换。
                实际上交换不太准确,类似晋升,因为这时候 master 已经挂了,slave broker 现在作为 master 来重新注册而已。
                因此处理逻辑只需要先删除掉原来的 slave broker 信息,然后重走一次注册流程即可。
                */
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }
    
                // 再次判断是否为首次注册
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                // oldAddr == null 就回到了前面第一次标记的情况
                // oldAddr != null 那说明有新的 broker 代替了该 brokerID 对应的旧 broker,这是第二种首次注册的情况
                registerFirst = registerFirst || (null == oldAddr);
    
                // Broker 为 主节点,并且该 Broker 对应的 topicConfig 信息发生变化(包括首次注册)
                // 这个情况下,需要创建/更新该 Broker 对应的 Topic 信息。
                // 说白了就是维护 topicQueueTable 中与该 Broker 相关的信息。
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                              // 这个方法就不详细展开了,其内容就是
                              // 1. 利用 brokerName 和 topicConfig(aka entry.getValue())来组装 queueData
                              // 2. topicQueueTable 利用 topicConfig.getTopicName() 来判断该 topic 的信息是否存在
                              // 3. 不存在就把这个 topic 注册进来,存在就更新  
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                // 这里就是之前心跳包时候说的,更新 brokerLiveTable
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {// 首次注册
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
    
                // filterServer 模式
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
    
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
    
        return result;
    }
    

注销/删除 Broker 信息

Namesrv 注销 broker 分两种情况:

  1. 120秒没收到 broker 发来的心跳包,自动删除,这个前面分析过了。

  2. Broker 正常关闭时会给 Namesrv 发出注销请求(unregisterBroker)。

有兴趣的话可以按照之前分析 broker 注册的思路在 Namesrv 的代码中找到如下方法自行分析。
public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request)

两种情况下,Namesrv 的处理逻辑是一致的,都是从RouteInfoManager 维护的数据表中删除/更新与该 broker 相关的信息, 唯一的区别就是 broker 正常注销的话,Namesrv 不会主动关闭 channel,因为他要给 Broker 返回注销成功的 response。

上次编辑于:
贡献者: Lament