跳至主要內容

RocketMQ 源码分析-消费消息

lament-z大约 22 分钟

RocketMQ 源码分析-消费消息

RocketMQ 面向的场景很多,不同场景对消息队列的要求是不一样的,尤其是在消费环节。

还是老样子,先介绍一些它的名词/基础概念,然后介绍消费相关的分类,最后跟着源码去看。

基础概念

这部分可以直接看官方文档,有图,官方 4.xopen in new window

消费模式

注意跟后面的 消息模式 MessageModel 进行区分。

推/拉两种模式都支持,本质上差别不大,你把 push 看成自动 pull 即可。

  • push
    在消息队列语境下,push 应该是服务端自动向 Consumer 推送消息。在 RocketMQ 语境下其实就是周期性的自动从 broker 拉取消息并存入 Consumer 本地队列,与此同时 Consumer 不断的从本地队列里取出消息进行处理。

  • pull
    Consumer 主动从服务端的消息队列中拉取消息。

无论哪种模式都要注意,既要避免消息在本地堆积,同时要注意消费间隔不能太长。

除了推拉之外,在 Consumer 消费消息这个行为这一层,还支持顺序消费和并发消费。

  • 顺序消费 ConsumeMessageOrderly
    顺序消费是在同一个队列上有序,并不是在 Topic 下全局有序,因为 Topic 通常都会有好几个队列。

    你可以通过设置 Topic 下只有一个队列来达成全局有序,牺牲队列本身的高可用,优势是不需要做额外的工作;也可以直接给消息加上全局自增的唯一ID的方式自行实现全局有序,就是实现方案比较麻烦,不过在分布式系统里你总能找到很多现成的功能拿来复用。

    顺序消费仅适用于集群模式。

  • 并发消费
    就是以并发的方式消费消息。

消费者 Consumer

可以从消息队列中消费消息的,都统一看作是消费者。 对于 Java 语境,RocketMQ 提供了 MQPushConsumerMQPullConsumer 两个接口以及对应的实现类,以便用户可以快速创建 Consumer,亦或者是自行实现消费客户端。

PUSH 模式的默认实现类:DefaultMQPushConsumer

PULL 模式的默认实现类:DefaultLitePullConsumer,另一个已弃用。

消费组 ConsumerGroup

消费组由订阅相同 Topic 的消费者组成。也就是说 ConsumerGroup 和 Topic 一样必须唯一,而且要和 Topic 一一对应,参考订阅关系。

MessageMode 消息模式

提示

消息模式为非官方译名

RocketMQ 提供了两种 MessageMode,集群模式和广播模式。

  • 集群模式 MessageModel.CLUSTERING
    集群模式下,对 RocketMQ 而言,任意一条消息,只要被消费组内任意 Consumer 消费成功,该消息就被视为已消费。

  • 广播模式 MessageModel.BROADCASTING
    广播模式下,对 RocketMQ 而言,任意一条消息,需要被消费组内所有 Consumer 至少消费成功 1 次,该消息才被视为已消费。

集群模式-负载均衡

集群模式下,消息消费的方式类似于“请求->网关->服务”的方式,所以在消息被消费之前可以加一层负载均衡来分配消息。

目前 RocketMQ 提供了六种分配策略,默认为:AllocateMessageQueueAveragely。

消费位点 (consuming point)

提示

中英都是官方名称

类似前几篇里我写的消费进度的概念,它代表客户端目前消费到队列中哪个位置。集群模式下 Consumer 会把消费位点提交给 Broker;广播模式下则由 Consumer 自行保存。

Consumer 启动时有三种初始消费位点的选择:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSETCONSUME_FROM_TIMESTAMP

初始消费位点的值存储于:DefaultMQPushConsumer.consumeFromWhere

订阅关系 && 订阅关系一致

消费者组订阅 Topic+Tag,叫订阅关系。

订阅关系一致:同一个消费者组下所有 Consumer 订阅的 Topic+Tag 必须完全一致。

如果订阅关系不一致,会导致消费错乱和消息丢失。

Consumer --n:1-- ConsumerGroup --1:1--> Topic + Tag。

不想画图,详见官方文档-4.x-订阅关系一致open in new window

源码分析

主线逻辑是从 DefaultMQPushConsumer 入手,介绍 push/pull 的实现,集群和广播模式下的具体细节等。

消费端的实现类

Consumer 跟 Producer 一样也是 Netty 客户端,相似逻辑不再解释。

从源码可以看到,DefaultMQPullConsumer 和其内部实现类 DefaultMQPullConsumerImpl 已经被标注 Deprecated,PULL 模式目前默认为 DefaultLitePullConsumer

我们从 DefaultMQPushConsumer 开始。

Consumer 启动流程

DefaultMQPushConsumer::start,这里面有一步设置消费组,重试组和 DLQ 命名空间的包装也会在这里进行,之后就是 defaultMQPushConsumerImpl.start();,如果开启消息追踪,那么还会启动 traceDispatcher.start();

然后来看 defaultMQPushConsumerImpl.start();

  • defaultMQPushConsumerImpl.start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
    case CREATE_JUST:
        // ... 省略日志

        this.serviceState = ServiceState.START_FAILED;
        // 校验 Consumer 客户端配置,不清楚客户端配置的可以去看一眼
        this.checkConfig();

        // 这里做两件事
        // 1. 把 defaultMQPushConsumer 中的订阅复制一份给 rebalanceImpl
        // 2. 如果是 MessageModel 是集群模式,那么创建 retryTopic,并再一复制订阅关系,一起添加到 rebalanceImpl
        this.copySubscription();

        // InstanceName 校验,其实这一步可以放进 checkConfig()方法里
        // 如果你的 InatanceName 是默认值,将修改为 PID+纳秒级时间戳
        if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
            this.defaultMQPushConsumer.changeInstanceNameToPID();
        }

        // 跟 Producer 一样,创建 netty 客户端
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

        // 装配 rebalanceImpl ,关于 RebalanceImpl 后面再说,你知道他是个消费行为之前的负载均衡器即可
        this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
        this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

        // 拉取消息 API 的包装器
        // 无论pull or push 模式,都是 consumer 从 broker 上把消息拉回本地再进行处理
        this.pullAPIWrapper = new PullAPIWrapper(
            mQClientFactory,
            this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
        this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

        // 这里又来到一个 Deprecated 方法
        // 这部分就是前文提到的消费位点的存储
        if (this.defaultMQPushConsumer.getOffsetStore() != null) {
            this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
        } else {

            // 根据消费模式判定
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING: // 广播模式本地存取,这里只初始化 offsetStore
                    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                case CLUSTERING: // 集群模式,消费位点要上传 broker,所以这里要从 broker 上拉
                    // 这里初始化 offsetStore
                    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                default:
                    break;
            }
            // 给外部包装类初始化 offsetStore
            this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
        }

        // offsetStore 的具体类型取决于上面的代码, LocalFileOffsetStore or RemoteBrokerOffsetStore
        // LocalFileOffsetStore.load 就是从本地文件中取出消费位点
        // 路径是 user.home/.rocketmq_offsets/clientID/groupName/offsets.json
        // 流程就是 读文件,然后 json 序列化为 wrapper 对象,然后把对象的值传给 offsetTable,顺带打印 load 日志

        // 集群模式从 Broker 远程拉取,所以 RemoteBrokerOffsetStore::load 啥也不干。
        this.offsetStore.load();


        if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
          // 顺序消费 -> ConsumeMessageOrderlyService
            this.consumeOrderly = true;
            this.consumeMessageService =
                new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
        } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
          // 并发消费 -> ConsumeMessageConcurrentlyService
            this.consumeOrderly = false;
            this.consumeMessageService =
                new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
        }

        // ConsumeMessageOrderlyService 仅适用于集群模式
        // 它会提交一个周期任务给 scheduledExecutorService(单线程),scheduledExecutorService 每 20 秒调用 rebalanceImpl().lockAll() 对远程队列上锁。

        // ConsumeMessageConcurrentlyService
        // 提交一个周期任务给 cleanExpireMsgExecutors(单线程),每 15 分钟清理一次过期消息,避免消费卡住。
        // 后面单独来看这俩 Service
        this.consumeMessageService.start();

        // 就是把消费组和对应的 Consumer 存入 consumerTable
        boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
        if (!registerOK) {
          // 注册失败直接关闭
          // ...
        }

        // 主要看这个方法
        mQClientFactory.start();

        log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
        this.serviceState = ServiceState.RUNNING;
        break;
      case RUNNING:
      case START_FAILED:
      case SHUTDOWN_ALREADY:
          throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
              + this.serviceState
              + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
              null);
      default:
          break;
  }
  // 从 NameSrv 更新路由信息,并存入 topicRouteTable
  this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  // 发送远程请求 RequestCode.CHECK_CLIENT_CONFIG
  this.mQClientFactory.checkClientInBroker();
  this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  // 启动 rebalanceService
  this.mQClientFactory.rebalanceImmediately();
}
  • mQClientFactory.start();
      this.serviceState = ServiceState.START_FAILED;
      // If not specified,looking address from name server
      if (null == this.clientConfig.getNamesrvAddr()) {
          this.mQClientAPIImpl.fetchNameServerAddr();
      }
      // Start request-response channel
      // netty client
      this.mQClientAPIImpl.start();
      // Start various schedule tasks
      this.startScheduledTask();
      // Start pull service
      // 重点看 pullMessageService
      this.pullMessageService.start();
      // Start rebalance service
      // rebalanceService 主要是定期执行 doRebalance
      // doRebalance 有三种实现方法,对应不同 Consumer 实现类
      this.rebalanceService.start();
      // Start push service
      this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
      log.info("the client factory [{}] start OK", this.clientId);
      this.serviceState = ServiceState.RUNNING;          
  • 总结

到这里启动就完成了,简单总结一下启动流程:
第一步,初始化和组装 rebalanceImpl 和 Consumer ;
第二步,载入消费位点,广播模式存在本地,集群模式存储在 broker;
第三步,初始化并启动 consumeMessageService,实现类有两个,顺序消费是 ConsumeMessageOrderlyService,并行消费是 ConsumeMessageConcurrentlyService
第四步,把 Consumer 注册到 consumerTable; 最后启动 Consumer 和各种 Services。

然后我们来看“消费”。

ConsumeMessageService

消费这部分主要关注 ConsumeMessageServicePullMessageService这两块。

ConsumeMessageService 前面已经看到了,它有顺序消费和并发消费两个实现类,这个 Service 主要职责就是负责消费消息。

  • ConsumeMessageOrderlyService 顺序消费

    无论是哪个实现类,ConsumeMessageService 一定会有 consumeExecutor 属性,它是执行消费行为的线程/线程池,这里就统称执行器。private final ThreadPoolExecutor consumeExecutor;

    ConsumeMessageOrderlyServiceconsumeExecutor 是一个包含 20 个线程的线程池,并发版本的也一样。

    this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(), // core 20
            this.defaultMQPushConsumer.getConsumeThreadMax(), // max 20
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue, // workQueue LinkedBlockingQueue
            new ThreadFactoryImpl("ConsumeMessageThread_"));
    

    线程池有了,我们来看线程池要执行的任务: ConsumeRequest

    代码太长还是大量省略,除了要看 run 方法外,要注意创建 ConsumeRequest 时,需要接收两个队列作为参数, processQueuemessageQueue

    processQueue 暂且就简单理解为待消费的消息队列(local),messageQueue 就是待拉取的消息队列(remote)。

    // ConsumeRequest::run   
    public void run() {
      // 首先检查 processQueue.isDropped ...
    
      // 上锁,保证一个队列一次只被一个线程消费
        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
        // 分支判断
        // 如果是广播模式 或者 processQueue 被锁住且锁未过期 则开始进行循环
          for (boolean continueConsume = true; continueConsume;) { // 当成 while(continueConsume) 即可
            // 再次 检查 processQueue.isDropped ,是则 break
            if(...){ log & break;}
            // 如果是集群模式 且 processQueue 未被锁住,则尝试重新上锁并重新消费
            if(...){
              // 这个方法就是 先对第一个参数 messageQueue 上锁,
              // 如果成功,则 10ms 后重新提交一个 ConsumeRequest 请求,
              // 如果不成功,则 3s 后重新提交一个 ConsumeRequest 请求。
              ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
              break
            }
    
            // 如果 集群模式 且 processQueue 锁过期,也是 tryLockLaterAndReconsume + break; 跟前面一样。
            if(...){...}
            // 计算真实提交间隔 interval  当前时间 - beginTime,注意 beginTime 在循环外
            long interval = System.currentTimeMillis() - beginTime;
            if (interval > MAX_TIME_CONSUME_CONTINUOUSLY /*60s*/) { // MAX_TIME_CONSUME_CONTINUOUSLY 是 rocketmq.client.maxTimeConsumeContinuously 的值。
                ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                break;
            }
            // 以上的四个 if + break 都是为了保证当前消息一定要被消费
    
            // 然后是消息的批量处理 consumeMessageBatchMaxSize 默认为 1
            // 从 processQueue.takeMessages 方法中就可以看出来,
            // 平时消息是存储于 msgTreeMap 中,只有顺序消费时才取出对应数量消息写入 consumingMsgOrderlyTreeMap
             List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
    
            // 创建并装配 ConsumeOrderlyContext 略
    
            // 这里就是你自己实现的 Consumer.registerMessageListener 里面
            // 注册并实现的 listener 的 consumeMessage 方法的调用。
            // 换句话说这一步就是执行真正的消费
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
    
            // 后面全部略 有兴趣可以自己看,就是一些后续处理 ...
            // 退出循环的条件就是 continueConsume == false;
    
            // 消费的最后一步, ack 消息
             continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
          }
          // ...
        }
        // ...
    }
    

    由此我们可以知道顺序消费时,ProcessQueue 会从 msgTreeMap 中取出 batchSize(默认为 1)条消息存入 consumingMsgOrderlyTreeMap 并进行批量消费。

    目前执行器 consumeExecutor 和线程任务 ConsumeRequest 都有了,那么 ConsumeRequest 是如何提交给 consumeExecutor 的工作队列的呢?

    就是 ConsumeMessageOrderlyService::submitConsumeRequest 方法,非常简单,根据参数提供的俩 Queue 来创建一个 ConsumeRequest,然后直接提交。

    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
    

    至此,围绕 consumeExecutor 的基本要素已经凑齐。

    接下来我们要来看消息是怎么从 broker 传递过来的,然后经过了哪些处理,最后如何被提交给 consumeExecutor 的。 换句话说就是:ConsumeMessageOrderlyService::submitConsumeRequest 这个方法是如何被调用的。

    为了方便理解,我们就不从 submitConsumeRequest 向上追踪了,读者可以自行从反推验证。

PullMessageService

还记得前面启动流程中注释了重点的 this.pullMessageService.start(); 么。

PullMessageService 是就是负责把消息从 Broker 拉取到 Consumer 本地,并将消息封装成 ConsumeRequest 提交给线程池的重要 Service 。

PullMessageService 内维护一个单线程执行器 scheduledExecutorService,负责定期把 PullRequest 提交到 pullRequestQueue 中。

同时 PullMessageService 自身也是一个服务线程 (ServiceThread, RocketMQ 内部自定义的抽象类),PullMessageService 的工作就是不停的从 pullRequestQueue 中取出 PullRequest 并执行 PullMessageService.pullMessage(final PullRequest pullRequest) 方法处理请求。

PullMessageService::pullMessage 实际上也并不会真正对请求进行处理,它只是选择一个 Consumer,通过调用 Consumer.pullMessage(final PullRequest pullRequest) 来处理。 可能是因为 DefaultMQPullConsumerImpl 被弃用的原因,这里它默认所有的 Consumer 都是 DefaultMQPushConsumerImpl 类型,(强制类型转换)。 因此也可以认为是 DefaultMQPushConsumerImpl::pullMessage 才是真正处理 PullRequest 请求的方法。 DefaultMQPushConsumerImpl::pullMessage 的主要工作就是从 broker 上拉消息回来,它里面的 callback 则是对拉回来的消息进行处理,比如解码啊、存入 msgTreeMap 啊、把消息提交给 consumeExecutor 去执行什么的。

  • PullMessageService 的属性和基础方法

    public class PullMessageService extends ServiceThread {
      // 存放 pullRequest 请求的队列
      private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
      private final MQClientInstance mQClientFactory;
      // 单线程、周期执行
      private final ScheduledExecutorService scheduledExecutorService = Executors
          .newSingleThreadScheduledExecutor(r -> new Thread(r, "PullMessageServiceScheduledThread"));
    
      // 构造方法
      public PullMessageService(MQClientInstance mQClientFactory) {
          this.mQClientFactory = mQClientFactory;
      }
    
      // PullMessageService 的 run 方法
      @Override
      public void run() {
          log.info(this.getServiceName() + " service started");
    
          while (!this.isStopped()) {
              try {
                  // 从 pullRequestQueue 中取出 pullRequest
                  PullRequest pullRequest = this.pullRequestQueue.take();
                  // 转交给 consumer.pullMessage 去执行
                  this.pullMessage(pullRequest);
              } catch (InterruptedException ignored) {
              } catch (Exception e) {
                  log.error("Pull Message Service Run Method exception", e);
              }
          }
    
          log.info(this.getServiceName() + " service end");
      }
      // ...
    }
    
  • pullRequest 存入 pullRequestQueue

    两个方法,executePullRequestImmediatelyexecutePullRequestLater

    public void executePullRequestImmediately(final PullRequest pullRequest) {
          try {
              this.pullRequestQueue.put(pullRequest);
          } catch (InterruptedException e) {
              log.error("executePullRequestImmediately pullRequestQueue.put", e);
          }
      }
    
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
          if (!isStopped()) {
            // 就是定时调用 executePullRequestImmediately
              this.scheduledExecutorService.schedule(() -> PullMessageService.this.executePullRequestImmediately(pullRequest), timeDelay, TimeUnit.MILLISECONDS);
          } else {
              log.warn("PullMessageServiceScheduledThread has shutdown");
          }
      }
    
  • PullMessageService::pullMessage

    private void pullMessage(final PullRequest pullRequest) {
          // 从 MQClientInstance.consumerTable 中 以 consumerGroup 为 KEY 取出 consumer
          // 这个 consumer 就是之前 DefaultMQPushConsumerImpl::start 中注册那一步存入 consumerTable 的
          final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
          if (consumer != null) {
              // 强转
              DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
              // 实际处理
              impl.pullMessage(pullRequest);
          } else {
              log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
          }
      }
    
  • PullRequest

    看如何处理 PullRequest 请求前,先看一下它是什么。

    简单的说就是拉取消息的请求。

    // PullRequest 属性、重要重写 Override 方法
    public class PullRequest {
    
        private String consumerGroup;
        // 这俩 Queue 跟 ConsumeRequest 中语义一致
        // 代表远程 Queue
        // 它里面只存储 topic + brokerName + queueID
        // 这三个属性足够 Consumer 确定自己从哪个 Broker 找到哪个 topic 下的哪个队列
        private MessageQueue messageQueue;
        // 消息存入的本地 Queue
        // ProcessQueue 可以自己去看一下
        private ProcessQueue processQueue;
        // 远程 Queue 的 offset,消费进度,消费位点
        private long nextOffset;
        private boolean previouslyLocked = false;
    
    
        // hashcode:  (1*31 + consumerGroup.hashcode) * 31 + messageQueue.hashcode
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
            result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
            return result;
        }
    
      @Override
      public boolean equals(Object obj) {
          if (this == obj)
              return true;
          if (obj == null)
              return false;
          if (getClass() != obj.getClass())
              return false;
          PullRequest other = (PullRequest) obj;
          if (consumerGroup == null) {
              if (other.consumerGroup != null)
                  return false;
          } else if (!consumerGroup.equals(other.consumerGroup))
              return false;
          if (messageQueue == null) {
              if (other.messageQueue != null)
                  return false;
          } else if (!messageQueue.equals(other.messageQueue))
              return false;
          return true;
      }
    
    }
    

    它的 hashCode 计算方式类似 String.hashCode,基本上也是基于以下公式:

     f(xn+1)=31f(xn)+xn+1

    n 是迭代次数,xn 是变量,需要计算几次就迭代几次

    顺便提一句,看到 Override hashCode() 和 equals(),那么大概率他是要在某个数据表里作为 key 使用了。

  • DefaultMQPushConsumerImpl::pullMessage

    这个方法也很长,老样子删减,只看主要逻辑,这里我把 pullMessage 的代码分成了三个部分。 前面控流的部分可以不看,中间 callback 要看一下,核心内容在第三段。

    下面控流这一段可以不看,可全部跳过。

    // DefaultMQPushConsumerImpl::pullMessage
    // get ProcessQueue
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    // 控流
    // 本地缓存消息数量 processQueue.getMsgCount > 1000 触发控流
    // 间隔为 PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50ms
     if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
       // 控流的处理方式就是前面贴过的 PullMessageService().executePullRequestLater
       // 实际上就是把 pullRequest 重新 put 回 pullRequestQueue
       this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
       // log ...
       // 然后直接 return
       return;
     }
    
     // 本地缓存消息 > 100 Mb 触发控流
     // 处理方式同上 代码略
     if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {...}
    
     // 并发消费控流
     // maxSpan > 2000 处理方式同上
     if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {...}
    
     // 顺序消费
     // 不算是控流、不过处理方式一样  
     if (processQueue.isLocked()) {
         // pullRequest.isPreviouslyLocked 默认为 false
        if (!pullRequest.isPreviouslyLocked()) { // 首次 pullMessage 会进入这里
            long offset = -1L;
            try {
              // 根据默认的初始消费位点(见基础概念-消费位点部分) 计算实际消费位点
              // 这个可以自己去看代码,或者看书也行
                offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
            } catch (Exception e) {
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
                return;
            }
            // broker 异常 or readFile 异常时会为 true,也就是 offset 为 -1
            // offset 如果正常会后面会设置为 pullRequest.nextOffset 的值
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                pullRequest, offset, brokerBusy);
            if (brokerBusy) {
                log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                    pullRequest, offset);
            }
    
            pullRequest.setPreviouslyLocked(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        // pullTimeDelayMillsWhenException 默认 3000ms
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
    

    然后来看第二部分,一个老长的 callback,主要看 onSuccess 方法,它是给 consumer 发起异步请求成功时用的。主要功能就是把拉回来的数据解码为 msgList 然后存入 msgTreeMap 以便消费,最后调用 consumeMessageService.submitConsumeRequest 提交复制消费的执行器执行。

    // DefaultMQPushConsumerImpl::pullMessage  
    
    PullCallback pullCallback = new PullCallback() {
    
      @Override
      public void onSuccess(PullResult pullResult){
        // 对从 broker 上拉回来的消息进行解码过滤等处理。
        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
            subscriptionData);
    
        switch (pullResult.getPullStatus()) {
            case FOUND: // FOUND 就是代表 从 Broker 拉回了新消息
                long prevRequestOffset = pullRequest.getNextOffset();
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                // 状态信息的处理 略 ...
    
                long firstMsgOffset = Long.MAX_VALUE;
                // MsgFoundList 为空,立刻再提交一个 pullRequest 到队列中
                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                } else {
                    // 拉回的第一条消息的 offset
                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                    // 状态处理,略
    
                    // 就是存入 msgTreeMap ,顺便同步维护 ProcessQueue 的属性
                    // 只要 msgTreeMap 不为空,且 ProcessQueue 不处于 消费中的状态,就返回 true
                    // 其他情况则返回 false
                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                    // 然后就来到了 consumeMessageService.submitConsumeRequest
                    // 也就是前文讲的 创建 ConsumeRequest 并提交给 consumeExecutor 去执行。
                    // 这方法有两种实现,顺序消费/并发消费
                    // dispatchToConsume 参数仅作用于顺序消费,如果是 false 则不会执行
                    // 并发消费无所谓顺序,所以 dispatchToConsume 参数就没啥用
                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                        pullResult.getMsgFoundList(),
                        processQueue,
                        pullRequest.getMessageQueue(),
                        dispatchToConsume);
    
                    // 这个 PullInterval 拉取间隔默认是0 ,如果大于0了自然就是等会儿再去拉取。
                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                    } else {
                        // PullInterval == 0 就立即拉取
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    }
                }
    
                if (pullResult.getNextBeginOffset() < prevRequestOffset
                    || firstMsgOffset < prevRequestOffset) {
                    log.warn(
                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                        pullResult.getNextBeginOffset(),
                        firstMsgOffset,
                        prevRequestOffset);
                }
                break;
            case NO_NEW_MSG: // 没有新消息
            case NO_MATCHED_MSG: // 没有匹配的消息
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                // 更新 offset,本地 or local 两种方式
                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                break;
            case OFFSET_ILLEGAL: // 偏移量非法
                log.warn("the pull request offset illegal, {} {}",
                    pullRequest.toString(), pullResult.toString());
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                pullRequest.getProcessQueue().setDropped(true);
                DefaultMQPushConsumerImpl.this.executeTaskLater(() -> {
                  try { // 重新计算并更新偏移量,移除当前 processQueue
                        // 这部分就不展开了
                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(),false);
                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
    
                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    
                        log.warn("fix the pull request offset, {}", pullRequest);
                    } catch (Throwable e) {
                        log.error("executeTaskLater Exception", e);
                    }
                }, 10000);
                break;
            default:
                break;
        }
      }
    
      @Override
      public void onException(Throwable e){
        ...
      }
    }
    

    最后是第三部分,对 broker 发起拉取消息请求:

    // 3rd part
      boolean commitOffsetEnable = false;
      long commitOffsetValue = 0L;
      // 读取消费位点,分顺序/集群两种实现,不过集群也是从本地内存里读
      if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
          commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
          if (commitOffsetValue > 0) {
              commitOffsetEnable = true;
          }
      }
    
      String subExpression = null;
      boolean classFilter = false;
      // 从 rebalanceImpl 中根据 topic 拿到订阅数据
      SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
      if (sd != null) {
              // isPostSubscriptionWhenPull 是否每次 pull 的时候都更新订阅关系 ,默认 false
          if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
              subExpression = sd.getSubString();
          }
    
          classFilter = sd.isClassFilterMode();
      }
    
      int sysFlag = PullSysFlag.buildSysFlag(
          commitOffsetEnable, // commitOffset
          true, // suspend
          subExpression != null, // subscription
          classFilter // class filter
      );
      try {
          // 最后 netty client 发起拉消息的请求,这就不展开了
          this.pullAPIWrapper.pullKernelImpl(
              pullRequest.getMessageQueue(),
              subExpression,
              subscriptionData.getExpressionType(),
              subscriptionData.getSubVersion(),
              pullRequest.getNextOffset(),
              this.defaultMQPushConsumer.getPullBatchSize(),
              sysFlag,
              commitOffsetValue,
              BROKER_SUSPEND_MAX_TIME_MILLIS,
              CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
              CommunicationMode.ASYNC,
              pullCallback
          );
      } catch (Exception e) {
          log.error("pullKernelImpl exception", e);
          this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
      }
    

负载均衡

rebalance 这一层从理论模型角度看是加在消费行为之前的,主要针对集群消费模式。

实际实现方式是,负载均衡器每隔 20s 对远程 Queue 上锁,然后执行 doRebalance,最后释放锁。

换句话说就是消费组内的每个 consumer 都只管从 processQueue 中取出消息进行消费,而负载均衡器则利用分配策略为 consumer 分配他应该消费哪些消息。

  • 基本组成

RebalanceImpl 是个抽象类,有三个子类,RebalanceLitePullImplRebalancePullImplRebalancePushImpl。 很明显就是对应三种 consumer 实现类。 以 Push 模式为例,负载均衡器内部维护如下三个数据表:

protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
    new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();

四条属性,在 consumer 启动阶段初始化(实际上 subscriptionInner 也会初始化):

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

构造方法:

public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
      this(null, null, null, null, defaultMQPushConsumerImpl);
  }

负载均衡的实现和 consumer 的实现要对应。

延迟解锁的值默认为 20s,也就是一个负载均衡器远程上锁的周期:

private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));

负责执行负载均衡任务的是 RebalanceService

它的工作就是 20s 执行一次负载均衡:

// RebalanceService::run
while (!this.isStopped()) {
   this.waitForRunning(waitInterval);
   this.mqClientFactory.doRebalance();
}
  • 负载均衡操作 RebalanceImpl::doRebalance
// RebalanceImpl::doRebalance  

// isOder 在 Pull 模式为 false,Push 模式根据 consumer.isConsumeOrderly 判断
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                // this one
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    // 从 processQueueTable 中剔除订阅关系错误的消息
    this.truncateMessageQueueNotMyTopic();
}

然后来看 RebalanceImpl::rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
     switch (messageModel) {
         case BROADCASTING: { // 广播模式跟负载均衡这个概念其实没啥关系
              // 略 ...
             break;
         }
         case CLUSTERING: { // 主要看集群模式  
           // topic 下的 MessageQueue 集合
             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
             // cidAll 就是 topic 对应的 消费组下的所有 consumer 的 id 列表
             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
             if (null == mqSet) {
                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                 }
             }
             if (null == cidAll) {
                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
             }

             if (mqSet != null && cidAll != null) {
                 List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                 mqAll.addAll(mqSet);

                 // 排序,保证每个 consumer 上都是一致的。
                 Collections.sort(mqAll);
                 Collections.sort(cidAll);

                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                 List<MessageQueue> allocateResult = null;
                 try {
                    // 根据不同策略来分配
                     allocateResult = strategy.allocate(
                         this.consumerGroup,
                         this.mQClientFactory.getClientId(),
                         mqAll,
                         cidAll);
                 } catch (Throwable e) {
                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                         e);
                     return;
                 }

                 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                 if (allocateResult != null) {
                     allocateResultSet.addAll(allocateResult);
                 }
                 // 遍历 processQueueTable,根据 allocateResultSet 剔除无效队列,添加新队列失败时,重新计算消费位点并重拉消息。
                 // 只要对 processQueueTable 进行修改就会返回 true ,nothing touched return false
                 // 通常是消费组内的 consumer 数量发生变化,然后负载均衡器就要给 consumer 重新计算它现在要负载消费哪个队列
                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                 if (changed) {
                     log.info(
                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                         allocateResultSet.size(), allocateResultSet);
                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
                 }
             }
             break;
         }
         default:
             break;
     }
 }

可以看到广播模式就是直接执行 updateProcessQueueTableInRebalance,集群模式则是先执行 rebalance,然后再执行 updateProcessQueueTableInRebalance。 说白了广播模式不存在负载均衡这个概念。

  • AllocateMessageQueueStrategy 负载均衡策略

    目前一共六种负载均衡策略,默认的是 AllocateMessageQueueAveragely,这里就不罗列了,有兴趣可以自己去看。顺便自行探索为什么文档和最佳实践都建议消费者数量要比队列数量少。

  • 总结

    可以看出,rebalance 是一个集群模式下的,基于队列和消费者数量的负载均衡。 它把队列和消费者进行了一对一映射,然后每个消费者按 pullMsg --> 提交消费位点 --> 持久化消费位点的流程处理消息,pullMsg 时不提交消费状态。 为了避免消息被多个消费者重复消费,每个队列仅被一个消费者消费。

    并且要注意,由于 rebalance 是单独一个线程每 20s 进行一次分配,当队列数、消费者数发生变化时,重复消费是有可能发生的。

    注意这里都是 4.x 版本,5.x 版本负载均衡这块有重构,而且提供了消息粒度的负载均衡。

消息确认

无论是消费成功还是失败,亦或者其他情况,consumer 都会有“消费结果”,根据消费结果状态的不同,consumer 会进行不一样的处理(见 consumer.processConsumeResult()),比如 RECONSUME_LATER 就会把消息发回 broker 从而延迟消费。这个流程叫消息确认,或 SendMsgBack,或 ack 消息。

这部分代码就不展开分析了,本质就是 netty client 给 netty server 发请求,具体来说就是 MQClientAPIImpl::consumerSendMessageBack 方法,请求码为 RequestCode.CONSUMER_SEND_MSG_BACK,然后按照 rocketmq 自定义的请求协议封装好 requestHeader、body,encode,发送请求。 然后发送请求依然是有同步异步两种方式,不再赘述。

细节部分如果有兴趣,一方面你可以去查书 (《RocketMQ 技术内幕》),另一方面你可以去看内部协议的实现,个人建议两种方式结合,毕竟缺乏注释和细节文档的代码有个参考书还是好的。

总结

消息消费就到这里结束啦,已经太长了,又不是写书。没有特意讲解 pull 模式和并发消费的流程,不过大同小异。比如 Pull 模式的 consumer 实现,哪怕你不去看代码现在也应该能推测出来,它跟 DefaultMQPushConsumer 的区别就是它是自己主动去拉取消息,换句话说它不需要 PullMessageService,其他几乎都一样。ConsumeMessageConcurrentlyService 也是同理,它不需要保证顺序,所以直接消费就可以了,不需要像顺序消费一样从 consumingMsgOrderlyTreeMap 里一个个取,一个个提交。

接下来我们会回到 rocketmq-store,来看看 RocketMQ 的高可用也就是 HA 的实现。

上次编辑于:
贡献者: Lament