跳至主要內容

RocketMQ 源码分析-发送消息

lament-z大约 14 分钟

RocketMQ 源码分析-发送消息

这里开始详细聊聊 RocketMQ 发送消息的各种细节。

前面在简介里面说了,从使用者的角度,Producer 看起来就是直接往 Topic 里面塞消息。而实际上呢,要先去 Namesrv 找到 Topic 的路由信息,然后才能确定 Topic 是由哪个 Broker 管理。

发送消息支持三种方式,同步、异步、单向,区别很简单不再赘述,下文以同步为例进行分析。

前置要求

这里就假设读者已经顺利完成了 quick start 中的内容,收发消息已经全部成功。

源码分析

整个过程以描述+代码注释的方式进行。

从发送消息开始

下面贴了一段简单的发送消息的代码,逻辑很简单,先创建并启动 producer, 然后发送消息。

// 创建 Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

       producer.start(); // 启动 - 这是个 netty client

       for (int i = 0; i < 128; i++)
           try {
               {  // 构造消息
                   Message msg = new Message("TopicTest",
                       "TagA",
                       "OrderID188",
                       "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                   // 发送消息,这里是以同步的方式拿到返回结果。
                   // 异步则是通过 callback
                   // 单向则不关心结果
                   SendResult sendResult = producer.send(msg);

                   System.out.printf("%s%n", sendResult);
               }

           } catch (Exception e) {
               e.printStackTrace();
           }

       producer.shutdown(); // 关闭

确实非常简单对吧,接下来进入 producer.start(); 方法。

producer.start()

producerDefaultMQProducer 的实例,于是我们来看 DefaultMQProducer 是啥。

DefaultMQProducer

分析一个类就跟我们自己写一个类的流程差不多,先看他的属性和构造方法,然后再去看他的功能,也就是提供了哪些方法。

  • 属性

    源码中这部分是有注释的,可以先自己过一遍源码。

       protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    
       // 重试的响应码
       private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
               ResponseCode.TOPIC_NOT_EXIST,
               ResponseCode.SERVICE_NOT_AVAILABLE,
               ResponseCode.SYSTEM_ERROR,
               ResponseCode.NO_PERMISSION,
               ResponseCode.NO_BUYER_ID,
               ResponseCode.NOT_IN_CURRENT_UNIT
       ));
    
       // 生产者组,这个概念后面细说
       private String producerGroup;
    
       // 这就是自动创建主题机制的那个 Topic
       private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
    
       // 前面 Namesrv 里提过,一个 Broker 默认情况下会为每个 topic 创建 4个 readQueue 和 4个 writeQueue。  
       private volatile int defaultTopicQueueNums = 4;
    
       private int sendMsgTimeout = 3000;
    
       // 这个是压缩消息的阈值,Message.body 的大小超过 4k 就进行压缩
       private int compressMsgBodyOverHowmuch = 1024 * 4;
    
       // 发送消息失败会重试两次(总共最多发3次) 后面细说
       private int retryTimesWhenSendFailed = 2;
    
       // 这是发送异步消息失败也是重试两次
       // 源码注释里写了,这里也先提一下,异步消息失败的重试机制可能会造成消息重复发送。
       private int retryTimesWhenSendAsyncFailed = 2;
    
       // 这个是消息发送失败时,是否切换另外的 broker
       private boolean retryAnotherBrokerWhenNotStoreOK = false;
    
       // 消息大小的上限 aka 4M
       private int maxMessageSize = 1024 * 1024 * 4;
    
       // 这个先无视
       private TraceDispatcher traceDispatcher = null;
    
    
  • 构造方法

    数了一下...有10个构造方法,就不全贴出来了,只看这一个就行。

    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
     }
    

    可以看出来 DefaultMQProducer 其实就是对 DefaultMQProducerImpl 做了一层包装,他的其他方法就不列出来了,可以自己观察一下,这里我们直接去看 DefaultMQProducer::start()

DefaultMQProducer::start()

// DefaultMQProducer::start()
public void start() throws MQClientException {
      this.setProducerGroup(withNamespace(this.producerGroup));
      // 实际实现启动的是 DefaultMQProducerImpl 类
      this.defaultMQProducerImpl.start();
      if (null != traceDispatcher) {
          try {
              traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
          } catch (MQClientException e) {
              log.warn("trace dispatcher start failed ", e);
          }
      }
  }

接下来进入 DefaultMQProducerImpl::start()

DefaultMQProducerImpl::start()

public void start() throws MQClientException {
    this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
        // 起手就是个 switch
        switch (this.serviceState) {
            // serviceState 属性默认是 CREATE_JUST
            // 而无论是 DefaultMQProducerImpl 或者 DefaultMQProducer 的构造方法中都没有修改该属性的操作,所以我们的 producer.start() 肯定会进入 CREATE_JUST 分支。
            case CREATE_JUST:
                // 进来之后直接先标记为 “启动失败” 的状态。
                this.serviceState = ServiceState.START_FAILED;

                /* --- 校验开始 --- */

                // 然后开始检查配置,实际就是检查 producerGroup 是否存在,是不是默认的 "DEFAULT_PRODUCER" ,不符合要求就抛个异常提醒用户修改。
                // 比如 new DefaultMQProducer();时没有自定义 producerGroup 时,rocketmq 会默认你的 producerGroup 为 “DEFAULT_PRODUCER”, 之后代码运行到这里就抛异常了
                this.checkConfig();

                // 这段判定的作用跟上面的 this.checkConfig(); 其实差不多,也是提醒你要自定义 producerGroup。
                // 只要 ProducerGroup 不等于 "CLIENT_INNER_PRODUCER"  
                // 这个默认值是走 MQClientInstance 初始化 producer 用的。
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    // 这里要做的事情就是 如果 InstanceName 为 "DEFAULT" 就改成 Pid#TimeStamp 的形式。
                    // 目的是为了区分不同的 producer 实例
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                /* --- 校验结束 --- */

                /**
                   创建 MQ客户端的实例,这里简单说一下 MQClientInstance 是个啥,本质上就是 netty 客户端 + RocketMQ 自身的需求实现。
                   注意这里说的客户端/client 的概念是不区分 Consumer 和 Producer ,这两种角色都看作客户端。
                   MQClientManager ,他里面维护了一个 clientID 和 MQClientInstance 一一对应的 kv 结构,这里为了满足并发的需要用的是 ConcurrentHashMap。
                */

                /*
                  然后我们来看代码, MQClientManager.getInstance() 返回的是 MQClientManager 的
                  getOrCreateMQClientInstance() 返回的是 MQClientInstance 。
                  也就是前面说的 netty 客户端。
                  getOrCreateMQClientInstance 方法的逻辑是 key(clientId) 对应的 value(MQClientInstance) 存在则直接返回,不存在则创建后返回。
                */
                // 创建 instance
                // 无视掉 mQClientFactory 这个变量名,就是个 instance.
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                /*
                 MQClientInstance 创建好了,但是 Instance 的角色还不明确,前面说了,它可以是生产者也可以是消费者,这里 registerProducer 就是给 MQClientInstance 明确身份
                 registerProducer() 方法就是把刚创建的这个 MQClientInstance 注册到其内部维护的 producerTable 中
                 producerTable 也是 ConcurrentHashMap,他的 key 就是创建 Producer 时我们自定义的 ProducerGroup
                 一个 group 只对应一个 instance, 从这里我们就可以看出 ProducerGroup 这个 group 概念是基于集群的.
                 之后消费环节的 ConsumerGroup 同理
                */
                // 把 instance 注册到 producerTable,明确 instance 角色
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) { // ProducerGroup 和 MQClientInstance 一一对应,重复就会抛下面的异常
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                // 把 producer 的信息存入本地的 Topic 路由信息表中,并且初始化该 topic 的路由信息
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start(); // 最终可以看出,producer.start() 其实就是启动 netty 客户端。
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 将 ServiceState 修改为 运行中   
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        // producer 发送首次心跳请求。
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 启动定时任务,这个定时任务是扫描并移除 requestFutureTable 中的过期请求
        // 初始启动延迟3秒,之后一秒执行一次
        this.startScheduledTask();

    }

简单总结一下,创建 producer 并执行 producer.start() 的过程,实际上就是创建了一个自定义的 netty 客户端,标记为 Producer 角色,并且初始化其对应的 groupTable 和本地路由信息表,然后启动该客户端。客户端启动后发出首次心跳,并开启定时任务这个流程。

producer.send(Message msg)

搞清楚了什么是 producerproducer.start() 都做了什么之后,接下来我们来看 producer.send(),也就是发送消息的流程。

DefaultMQProducer::send(Message msg)

    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验 msg
        Validators.checkMessage(msg, this);
        // 完善 Topic 信息
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }

依然是包装。

DefaultMQProducerImpl::send(Message msg)

    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
  public SendResult send(Message msg,
      long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      // 可以看到我们直接 producer.send(msg) 时,默认为同步的方式。
      // CommunicationMode.SYNC
      return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
  }

主要看 sendDefaultImpl() 方法:

  private SendResult sendDefaultImpl(
      Message msg,
      final CommunicationMode communicationMode, // 同步、异步、单向由该参数决定
      final SendCallback sendCallback,
      final long timeout
  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      // 校验
      // 检查 ServiceState 是不是 RUNNING
      this.makeSureStateOK();
      // 校验 msg 的合法性,是不是为空啊,是不是超过4m最大小限制等等。
      // 跟 defaultMQProducer::send 里面那行校验是一样的是。
      Validators.checkMessage(msg, this.defaultMQProducer);  

      final long invokeID = random.nextLong();

      // 用来算超时的各种时间戳变量
      long beginTimestampFirst = System.currentTimeMillis();
      long beginTimestampPrev = beginTimestampFirst;
      long endTimestamp = beginTimestampFirst;

      // tryToFindTopicPublishInfo 方法就是 Intro 中提到的 producer 发消息时的整个路由流程
      // 前面看 producer.start() 时,结尾部分不是创建了一个空的路由信息么 new TopicPublishInfo()
      // 这个空的 topicPublishInfo 也在这里完成真正的初始化。
      // 后面看 DefaultMQProducerImpl 时具体展开。
      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

      // 本地路由表不为空 才继续执行
      if (topicPublishInfo != null && topicPublishInfo.ok()) {
          boolean callTimeout = false;
          MessageQueue mq = null;
          Exception exception = null;
          SendResult sendResult = null;

          // timesTotal就是一条消息的发送总次数,同步模式下 最多3次,1是正常发送那一次,剩下的两次是默认的重试次数;
          // 非同步模式为1次,非同步模式就是单向和异步,单向自然就是一次,异步其实是可以重试的,也就是不止一次。
          int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
          int times = 0;

          /* 下面是发送流程  */

          // brokersSent 发送次数
          String[] brokersSent = new String[timesTotal];
          for (; times < timesTotal; times++)
              // mq 里保存了 topic, brokerName, queueId 三项属性。
              String lastBrokerName = null == mq ? null : mq.getBrokerName();
              /*
                Broker 会为 Topic 创建多个 Queue,默认为 4个 readQueue 和 4个 writeQueue
                selectOneMessageQueue()这个方法这里不过多展开了,可以自己看一下。
               */
              // 根据路由信息找到用哪个 Queue,放在什么位置。
              MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
              if (mqSelected != null) {
                  mq = mqSelected;
                  brokersSent[times] = mq.getBrokerName();
                  try {
                      beginTimestampPrev = System.currentTimeMillis();
                      if (times > 0) {
                          //Reset topic with namespace during resend.
                          msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                      }
                      long costTime = beginTimestampPrev - beginTimestampFirst;
                      if (timeout < costTime) {
                          callTimeout = true;
                          break;
                      }

                      // 这里还是先不展开说明,概括一下就是 ENCODE 好数据包,然后执行 netty 的 writeAndFlush
                      // sendKernelImpl 再下一层的代码不明白的可以去看一眼 Netty,过一遍 netty 的 quick start 再回来看毕竟好懂。

                      // 发送消息并拿到返回结果。
                      sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                      // 不同模式下返回结果的处理
                      switch (communicationMode) {
                          case ASYNC: // 异步获取结果是利用 callback
                              return null;
                          case ONEWAY: // 单向发送不需要返回结果
                              return null;
                          case SYNC:  // 这里是我们要看的同步
                              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                // 重试逻辑,这个变量名 =_=
                                  if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                      continue;
                                  }
                              }

                              return sendResult;
                          default:
                              break;
                      }
                      // 后面都是异常的处理啥的
                  } catch (RemotingException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      continue;
                  } catch (MQClientException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      continue;
                  } catch (MQBrokerException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                          continue;
                      } else {
                          if (sendResult != null) {
                              return sendResult;
                          }

                          throw e;
                      }
                  } catch (InterruptedException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                      log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());

                      log.warn("sendKernelImpl exception", e);
                      log.warn(msg.toString());
                      throw e;
                  }
              } else {
                  break;
              }
          }

          if (sendResult != null) {
              return sendResult;
          }

          String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
              times,
              System.currentTimeMillis() - beginTimestampFirst,
              msg.getTopic(),
              Arrays.toString(brokersSent));

          info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

          MQClientException mqClientException = new MQClientException(info, exception);
          if (callTimeout) {
              throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
          }

          if (exception instanceof MQBrokerException) {
              mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
          } else if (exception instanceof RemotingConnectException) {
              mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
          } else if (exception instanceof RemotingTimeoutException) {
              mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
          } else if (exception instanceof MQClientException) {
              mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
          }

          throw mqClientException;
      }

      validateNameServerSetting();

      throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
          null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
  }

到此 producer.send() 的源码分析告一段落。

前面说过,发送消息支持三种模式,同步、异步、单向。 从前面的代码中我们可以发现,无论那种,最终具体实现都是 defaultMQProducerImpl::sendDefaultImpl() 里完成的。同步和异步的区别主要是入参的不同以及异步需要 callback 获取返回结果。而单向跟同步一个逻辑,只不过返回值被无视了。

文章最后稍微再补充一些关于 defaultMQProducerImpl 的内容。

发送消息这里还有一个可以看一下,批量发送消息,也就是 DefaultMQProducer::batch()方法.

DefaultMQProducer::batch()`

批量其实就是给单次发送的处理流程外面加了层循环。

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

      // MessageBatch 内部结构就是 List<Message> messages;
      MessageBatch msgBatch;
      try {
          // 这里 generateFromList 其实主要是对消息集合进行过滤
          // 不满足批量发送条件会直接抛异常 注意跟后面的 Validators.checkMessage 进行区分
          msgBatch = MessageBatch.generateFromList(msgs);

          for (Message message : msgBatch) {

              // 校验消息合法性 非批量发送也有这一步
              Validators.checkMessage(message, this);

              // 在 Message.properties 里添加一组 kv 参数
              // UNIQ_KEY:具体的key值,就通过 ip + host + 类加载的 hashcode 啥的有兴趣自己去看
              MessageClientIDSetter.setUniqID(message);

              // 设置 topic,非批量发送也有这一步
              message.setTopic(withNamespace(message.getTopic()));
          }

          // 这里 msgBatch.encode() 就是对整个 messageList 进行 encode。
          // 具体细节下面展开说
          msgBatch.setBody(msgBatch.encode());
      } catch (Exception e) {
          throw new MQClientException("Failed to initiate the MessageBatch", e);
      }
      msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
      return msgBatch;
  }

通过代码可以看出batch()方法的逻辑就是把所有 message 加好对应的标记,之后批量 encode,然后存放到msgBatch.setBody等待发送。

  // msgBatch.encode() 就是调用 MessageDecoder.encodeMessages()
  public static byte[] encodeMessages(List<Message> messages) {
      //TODO refactor, accumulate in one buffer, avoid copies

      // 那个 TODO 是源码里的,无视掉就好了

      // 创建一个 buffer list 用来存 encode 之后的 message。
      List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
      // 目前还不知道所有 message 加起来一共占用多大内存,所以初始化为0.
      int allSize = 0;

      // 这个循环,首先对 msg 逐条 encode。
      // 然后把 encode 完的 msg 存到 buffer list里
      // 之后累加计算所有 msg 最终需要多大的 buffer / 也就是内存。
      for (Message message : messages) {
          byte[] tmp = encodeMessage(message);
          encodedMessages.add(tmp);
          allSize += tmp.length;
      }

      // 按照计算出的 size 分配内存
      byte[] allBytes = new byte[allSize];
      int pos = 0;
      /* 通过 System.arraycopy 来将前面 buff list,
      也就是 encodedMessages,中的数据复制到 allBytes 中,
      这个 allBytes 就是之后 msgBatch.setBody()的参数。
      */
      for (byte[] bytes : encodedMessages) {
          System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
          pos += bytes.length;
      }
      return allBytes;
  }

DefaultMQProducerImpl

接下来我们来看看 DefaultMQProducerImpl 类提供的各种具体实现,不过还是老样子,先看属性和构造方法。

  • 属性

       // 随机数
       private final Random random = new Random();
       // 外层的壳子
       private final DefaultMQProducer defaultMQProducer;
       // 之前 intro 里面提过,Producer 本地是有缓存 topic 的路由信息的,就是存在这里。
       //本地路由信息表。key: Topic, Value: TopicPublishInfo
       private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
           new ConcurrentHashMap<String, TopicPublishInfo>();
       // 存 hook 的 list      
       private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();      
       private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
       // hook ,支持两个阶段,request 之前 和 request 之后
       private final RPCHook rpcHook;
    
       // 异步发送时用的 workQueue 和 Executor
       private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
       private final ExecutorService defaultAsyncSenderExecutor;
    
       // 定时任务的 Executor
       private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "RequestHouseKeepingService"));
    
       // 这俩也是一套 workQueue + Executor。
       protected BlockingQueue<Runnable> checkRequestQueue;
       protected ExecutorService checkExecutor;
    
       // 状态字段 代表 instance 刚刚创建
       private ServiceState serviceState = ServiceState.CREATE_JUST;
    
       // instance
       private MQClientInstance mQClientFactory;
    
       private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
       // 压缩级别
       private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
       // 失败策略
       private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    
       private ExecutorService asyncSenderExecutor;
    
  • 构造方法

      public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
            this.defaultMQProducer = defaultMQProducer;
            this.rpcHook = rpcHook;
    
            // 下面就是创建 workQueue 和 ThreadPoolExecutor  
            this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
            this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.asyncSenderThreadPoolQueue,
                new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                    }
                });
        }
    
    
  • tryToFindTopicPublishInfo 方法

     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
          // 从本地路由信息表里,根据 topic 取出对应的本地路由信息
         TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
         if (null == topicPublishInfo || !topicPublishInfo.ok()) { // 如果 topic 的路由信息不存在
             // 保护代码        
             this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    
    
             // 从 NameServer 上拉数据,更新本地路由信息表以及一系列相关信息。
             // 这个方法是线程安全的。 这个展开也超长,有兴趣可以自己看一下
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
             // 更新完本地路由信息表后重新取出 topic 的路由信息
             topicPublishInfo = this.topicPublishInfoTable.get(topic);
         }
    
         // 路由信息存在,则直接返回。
         if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
             return topicPublishInfo;
         } else { // 还不存在,再次去 broker 更新
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
             topicPublishInfo = this.topicPublishInfoTable.get(topic);
             return topicPublishInfo;
         }
     }
    
  • sendKernelImpl

    这里就是前面说的 producer.send() 方法

    // 这个可就太长了,只贴 switch 这段
    switch (communicationMode) {
        case ASYNC: // 异步
            Message tmpMessage = msg;
            boolean messageCloned = false;
            if (msgBodyCompressed) {
                //If msg body was compressed, msgbody should be reset using prevBody.
                //Clone new message using commpressed message body and recover origin massage.
                //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
                msg.setBody(prevBody);
            }
    
            if (topicWithNamespace) {
                if (!messageCloned) {
                    tmpMessage = MessageAccessor.cloneMessage(msg);
                    messageCloned = true;
                }
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
            // 超时判断
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
            }
            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                brokerAddr,
                mq.getBrokerName(),
                tmpMessage,
                requestHeader,
                timeout - costTimeAsync,
                communicationMode,
                sendCallback,
                topicPublishInfo,
                this.mQClientFactory,
                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                context,
                this);
            break;
        case ONEWAY:
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
            }
    
            // 同步发送消息。
            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                brokerAddr,
                mq.getBrokerName(),
                msg,
                requestHeader,
                timeout - costTimeSync,
                communicationMode,
                context,
                this);
            break;
        default:
            assert false;
            break;
    }
    

    有兴趣的话可以从 this.mQClientFactory.getMQClientAPIImpl() 方法继续往下看, 再下面一层就是 netty 层了,如何封装 request/response,header/body,api 怎么写什么的。 最终你应该能找到对 NettyRemotingAbstract::invokeSyncImpl() 方法的调用,这个方法 里就能看到熟悉的 channel.writeAndFlush(request)

总结

本文以跟随 producer.send() 为主线,通过对源码进行跟踪来了解 producer 发送消息背后的整体逻辑和流程。

PS. RocketMQ 的源码是真的很难读,后面的篇幅会进一步省略一些与主线逻辑关系不大的代码片段。

上次编辑于:
贡献者: Lament