RocketMQ 源码分析-发送消息
RocketMQ 源码分析-发送消息
这里开始详细聊聊 RocketMQ 发送消息的各种细节。
前面在简介里面说了,从使用者的角度,Producer 看起来就是直接往 Topic 里面塞消息。而实际上呢,要先去 Namesrv 找到 Topic 的路由信息,然后才能确定 Topic 是由哪个 Broker 管理。
发送消息支持三种方式,同步、异步、单向,区别很简单不再赘述,下文以同步为例进行分析。
前置要求
这里就假设读者已经顺利完成了 quick start 中的内容,收发消息已经全部成功。
源码分析
整个过程以描述+代码注释的方式进行。
从发送消息开始
下面贴了一段简单的发送消息的代码,逻辑很简单,先创建并启动 producer, 然后发送消息。
// 创建 Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
       producer.start(); // 启动 - 这是个 netty client
       for (int i = 0; i < 128; i++)
           try {
               {  // 构造消息
                   Message msg = new Message("TopicTest",
                       "TagA",
                       "OrderID188",
                       "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                   // 发送消息,这里是以同步的方式拿到返回结果。
                   // 异步则是通过 callback
                   // 单向则不关心结果
                   SendResult sendResult = producer.send(msg);
                   System.out.printf("%s%n", sendResult);
               }
           } catch (Exception e) {
               e.printStackTrace();
           }
       producer.shutdown(); // 关闭
确实非常简单对吧,接下来进入 producer.start(); 方法。
producer.start()
producer 是 DefaultMQProducer 的实例,于是我们来看 DefaultMQProducer 是啥。
DefaultMQProducer
分析一个类就跟我们自己写一个类的流程差不多,先看他的属性和构造方法,然后再去看他的功能,也就是提供了哪些方法。
- 属性 - 源码中这部分是有注释的,可以先自己过一遍源码。 - protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 重试的响应码 private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList( ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT )); // 生产者组,这个概念后面细说 private String producerGroup; // 这就是自动创建主题机制的那个 Topic private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; // 前面 Namesrv 里提过,一个 Broker 默认情况下会为每个 topic 创建 4个 readQueue 和 4个 writeQueue。 private volatile int defaultTopicQueueNums = 4; private int sendMsgTimeout = 3000; // 这个是压缩消息的阈值,Message.body 的大小超过 4k 就进行压缩 private int compressMsgBodyOverHowmuch = 1024 * 4; // 发送消息失败会重试两次(总共最多发3次) 后面细说 private int retryTimesWhenSendFailed = 2; // 这是发送异步消息失败也是重试两次 // 源码注释里写了,这里也先提一下,异步消息失败的重试机制可能会造成消息重复发送。 private int retryTimesWhenSendAsyncFailed = 2; // 这个是消息发送失败时,是否切换另外的 broker private boolean retryAnotherBrokerWhenNotStoreOK = false; // 消息大小的上限 aka 4M private int maxMessageSize = 1024 * 1024 * 4; // 这个先无视 private TraceDispatcher traceDispatcher = null;
- 构造方法 - 数了一下...有10个构造方法,就不全贴出来了,只看这一个就行。 - public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); }- 可以看出来 - DefaultMQProducer其实就是对- DefaultMQProducerImpl做了一层包装,他的其他方法就不列出来了,可以自己观察一下,这里我们直接去看- DefaultMQProducer::start()。
DefaultMQProducer::start()
// DefaultMQProducer::start()
public void start() throws MQClientException {
      this.setProducerGroup(withNamespace(this.producerGroup));
      // 实际实现启动的是 DefaultMQProducerImpl 类
      this.defaultMQProducerImpl.start();
      if (null != traceDispatcher) {
          try {
              traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
          } catch (MQClientException e) {
              log.warn("trace dispatcher start failed ", e);
          }
      }
  }
接下来进入 DefaultMQProducerImpl::start()。
DefaultMQProducerImpl::start()
public void start() throws MQClientException {
    this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
        // 起手就是个 switch
        switch (this.serviceState) {
            // serviceState 属性默认是 CREATE_JUST
            // 而无论是 DefaultMQProducerImpl 或者 DefaultMQProducer 的构造方法中都没有修改该属性的操作,所以我们的 producer.start() 肯定会进入 CREATE_JUST 分支。
            case CREATE_JUST:
                // 进来之后直接先标记为 “启动失败” 的状态。
                this.serviceState = ServiceState.START_FAILED;
                /* --- 校验开始 --- */
                // 然后开始检查配置,实际就是检查 producerGroup 是否存在,是不是默认的 "DEFAULT_PRODUCER" ,不符合要求就抛个异常提醒用户修改。
                // 比如 new DefaultMQProducer();时没有自定义 producerGroup 时,rocketmq 会默认你的 producerGroup 为 “DEFAULT_PRODUCER”, 之后代码运行到这里就抛异常了
                this.checkConfig();
                // 这段判定的作用跟上面的 this.checkConfig(); 其实差不多,也是提醒你要自定义 producerGroup。
                // 只要 ProducerGroup 不等于 "CLIENT_INNER_PRODUCER"  
                // 这个默认值是走 MQClientInstance 初始化 producer 用的。
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    // 这里要做的事情就是 如果 InstanceName 为 "DEFAULT" 就改成 Pid#TimeStamp 的形式。
                    // 目的是为了区分不同的 producer 实例
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                /* --- 校验结束 --- */
                /**
                   创建 MQ客户端的实例,这里简单说一下 MQClientInstance 是个啥,本质上就是 netty 客户端 + RocketMQ 自身的需求实现。
                   注意这里说的客户端/client 的概念是不区分 Consumer 和 Producer ,这两种角色都看作客户端。
                   MQClientManager ,他里面维护了一个 clientID 和 MQClientInstance 一一对应的 kv 结构,这里为了满足并发的需要用的是 ConcurrentHashMap。
                */
                /*
                  然后我们来看代码, MQClientManager.getInstance() 返回的是 MQClientManager 的
                  getOrCreateMQClientInstance() 返回的是 MQClientInstance 。
                  也就是前面说的 netty 客户端。
                  getOrCreateMQClientInstance 方法的逻辑是 key(clientId) 对应的 value(MQClientInstance) 存在则直接返回,不存在则创建后返回。
                */
                // 创建 instance
                // 无视掉 mQClientFactory 这个变量名,就是个 instance.
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                /*
                 MQClientInstance 创建好了,但是 Instance 的角色还不明确,前面说了,它可以是生产者也可以是消费者,这里 registerProducer 就是给 MQClientInstance 明确身份
                 registerProducer() 方法就是把刚创建的这个 MQClientInstance 注册到其内部维护的 producerTable 中
                 producerTable 也是 ConcurrentHashMap,他的 key 就是创建 Producer 时我们自定义的 ProducerGroup
                 一个 group 只对应一个 instance, 从这里我们就可以看出 ProducerGroup 这个 group 概念是基于集群的.
                 之后消费环节的 ConsumerGroup 同理
                */
                // 把 instance 注册到 producerTable,明确 instance 角色
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) { // ProducerGroup 和 MQClientInstance 一一对应,重复就会抛下面的异常
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                // 把 producer 的信息存入本地的 Topic 路由信息表中,并且初始化该 topic 的路由信息
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                if (startFactory) {
                    mQClientFactory.start(); // 最终可以看出,producer.start() 其实就是启动 netty 客户端。
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 将 ServiceState 修改为 运行中   
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        // producer 发送首次心跳请求。
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 启动定时任务,这个定时任务是扫描并移除 requestFutureTable 中的过期请求
        // 初始启动延迟3秒,之后一秒执行一次
        this.startScheduledTask();
    }
简单总结一下,创建 producer 并执行 producer.start() 的过程,实际上就是创建了一个自定义的 netty 客户端,标记为 Producer 角色,并且初始化其对应的 groupTable 和本地路由信息表,然后启动该客户端。客户端启动后发出首次心跳,并开启定时任务这个流程。
producer.send(Message msg)
搞清楚了什么是 producer 和 producer.start() 都做了什么之后,接下来我们来看 producer.send(),也就是发送消息的流程。
DefaultMQProducer::send(Message msg)
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验 msg
        Validators.checkMessage(msg, this);
        // 完善 Topic 信息
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }
依然是包装。
DefaultMQProducerImpl::send(Message msg)
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
  public SendResult send(Message msg,
      long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      // 可以看到我们直接 producer.send(msg) 时,默认为同步的方式。
      // CommunicationMode.SYNC
      return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
  }
主要看 sendDefaultImpl() 方法:
  private SendResult sendDefaultImpl(
      Message msg,
      final CommunicationMode communicationMode, // 同步、异步、单向由该参数决定
      final SendCallback sendCallback,
      final long timeout
  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      // 校验
      // 检查 ServiceState 是不是 RUNNING
      this.makeSureStateOK();
      // 校验 msg 的合法性,是不是为空啊,是不是超过4m最大小限制等等。
      // 跟 defaultMQProducer::send 里面那行校验是一样的是。
      Validators.checkMessage(msg, this.defaultMQProducer);  
      final long invokeID = random.nextLong();
      // 用来算超时的各种时间戳变量
      long beginTimestampFirst = System.currentTimeMillis();
      long beginTimestampPrev = beginTimestampFirst;
      long endTimestamp = beginTimestampFirst;
      // tryToFindTopicPublishInfo 方法就是 Intro 中提到的 producer 发消息时的整个路由流程
      // 前面看 producer.start() 时,结尾部分不是创建了一个空的路由信息么 new TopicPublishInfo()
      // 这个空的 topicPublishInfo 也在这里完成真正的初始化。
      // 后面看 DefaultMQProducerImpl 时具体展开。
      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
      // 本地路由表不为空 才继续执行
      if (topicPublishInfo != null && topicPublishInfo.ok()) {
          boolean callTimeout = false;
          MessageQueue mq = null;
          Exception exception = null;
          SendResult sendResult = null;
          // timesTotal就是一条消息的发送总次数,同步模式下 最多3次,1是正常发送那一次,剩下的两次是默认的重试次数;
          // 非同步模式为1次,非同步模式就是单向和异步,单向自然就是一次,异步其实是可以重试的,也就是不止一次。
          int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
          int times = 0;
          /* 下面是发送流程  */
          // brokersSent 发送次数
          String[] brokersSent = new String[timesTotal];
          for (; times < timesTotal; times++)
              // mq 里保存了 topic, brokerName, queueId 三项属性。
              String lastBrokerName = null == mq ? null : mq.getBrokerName();
              /*
                Broker 会为 Topic 创建多个 Queue,默认为 4个 readQueue 和 4个 writeQueue
                selectOneMessageQueue()这个方法这里不过多展开了,可以自己看一下。
               */
              // 根据路由信息找到用哪个 Queue,放在什么位置。
              MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
              if (mqSelected != null) {
                  mq = mqSelected;
                  brokersSent[times] = mq.getBrokerName();
                  try {
                      beginTimestampPrev = System.currentTimeMillis();
                      if (times > 0) {
                          //Reset topic with namespace during resend.
                          msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                      }
                      long costTime = beginTimestampPrev - beginTimestampFirst;
                      if (timeout < costTime) {
                          callTimeout = true;
                          break;
                      }
                      // 这里还是先不展开说明,概括一下就是 ENCODE 好数据包,然后执行 netty 的 writeAndFlush
                      // sendKernelImpl 再下一层的代码不明白的可以去看一眼 Netty,过一遍 netty 的 quick start 再回来看毕竟好懂。
                      // 发送消息并拿到返回结果。
                      sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                      // 不同模式下返回结果的处理
                      switch (communicationMode) {
                          case ASYNC: // 异步获取结果是利用 callback
                              return null;
                          case ONEWAY: // 单向发送不需要返回结果
                              return null;
                          case SYNC:  // 这里是我们要看的同步
                              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                // 重试逻辑,这个变量名 =_=
                                  if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                      continue;
                                  }
                              }
                              return sendResult;
                          default:
                              break;
                      }
                      // 后面都是异常的处理啥的
                  } catch (RemotingException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      continue;
                  } catch (MQClientException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      continue;
                  } catch (MQBrokerException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      exception = e;
                      if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                          continue;
                      } else {
                          if (sendResult != null) {
                              return sendResult;
                          }
                          throw e;
                      }
                  } catch (InterruptedException e) {
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                      log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                      log.warn(msg.toString());
                      log.warn("sendKernelImpl exception", e);
                      log.warn(msg.toString());
                      throw e;
                  }
              } else {
                  break;
              }
          }
          if (sendResult != null) {
              return sendResult;
          }
          String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
              times,
              System.currentTimeMillis() - beginTimestampFirst,
              msg.getTopic(),
              Arrays.toString(brokersSent));
          info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
          MQClientException mqClientException = new MQClientException(info, exception);
          if (callTimeout) {
              throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
          }
          if (exception instanceof MQBrokerException) {
              mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
          } else if (exception instanceof RemotingConnectException) {
              mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
          } else if (exception instanceof RemotingTimeoutException) {
              mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
          } else if (exception instanceof MQClientException) {
              mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
          }
          throw mqClientException;
      }
      validateNameServerSetting();
      throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
          null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
  }
到此 producer.send() 的源码分析告一段落。
前面说过,发送消息支持三种模式,同步、异步、单向。 从前面的代码中我们可以发现,无论那种,最终具体实现都是 defaultMQProducerImpl::sendDefaultImpl() 里完成的。同步和异步的区别主要是入参的不同以及异步需要 callback 获取返回结果。而单向跟同步一个逻辑,只不过返回值被无视了。
文章最后稍微再补充一些关于 defaultMQProducerImpl 的内容。
发送消息这里还有一个可以看一下,批量发送消息,也就是 DefaultMQProducer::batch()方法.
DefaultMQProducer::batch()`
批量其实就是给单次发送的处理流程外面加了层循环。
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
      // MessageBatch 内部结构就是 List<Message> messages;
      MessageBatch msgBatch;
      try {
          // 这里 generateFromList 其实主要是对消息集合进行过滤
          // 不满足批量发送条件会直接抛异常 注意跟后面的 Validators.checkMessage 进行区分
          msgBatch = MessageBatch.generateFromList(msgs);
          for (Message message : msgBatch) {
              // 校验消息合法性 非批量发送也有这一步
              Validators.checkMessage(message, this);
              // 在 Message.properties 里添加一组 kv 参数
              // UNIQ_KEY:具体的key值,就通过 ip + host + 类加载的 hashcode 啥的有兴趣自己去看
              MessageClientIDSetter.setUniqID(message);
              // 设置 topic,非批量发送也有这一步
              message.setTopic(withNamespace(message.getTopic()));
          }
          // 这里 msgBatch.encode() 就是对整个 messageList 进行 encode。
          // 具体细节下面展开说
          msgBatch.setBody(msgBatch.encode());
      } catch (Exception e) {
          throw new MQClientException("Failed to initiate the MessageBatch", e);
      }
      msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
      return msgBatch;
  }
通过代码可以看出batch()方法的逻辑就是把所有 message 加好对应的标记,之后批量 encode,然后存放到msgBatch.setBody等待发送。
  // msgBatch.encode() 就是调用 MessageDecoder.encodeMessages()
  public static byte[] encodeMessages(List<Message> messages) {
      //TODO refactor, accumulate in one buffer, avoid copies
      // 那个 TODO 是源码里的,无视掉就好了
      // 创建一个 buffer list 用来存 encode 之后的 message。
      List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
      // 目前还不知道所有 message 加起来一共占用多大内存,所以初始化为0.
      int allSize = 0;
      // 这个循环,首先对 msg 逐条 encode。
      // 然后把 encode 完的 msg 存到 buffer list里
      // 之后累加计算所有 msg 最终需要多大的 buffer / 也就是内存。
      for (Message message : messages) {
          byte[] tmp = encodeMessage(message);
          encodedMessages.add(tmp);
          allSize += tmp.length;
      }
      // 按照计算出的 size 分配内存
      byte[] allBytes = new byte[allSize];
      int pos = 0;
      /* 通过 System.arraycopy 来将前面 buff list,
      也就是 encodedMessages,中的数据复制到 allBytes 中,
      这个 allBytes 就是之后 msgBatch.setBody()的参数。
      */
      for (byte[] bytes : encodedMessages) {
          System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
          pos += bytes.length;
      }
      return allBytes;
  }
DefaultMQProducerImpl
接下来我们来看看 DefaultMQProducerImpl 类提供的各种具体实现,不过还是老样子,先看属性和构造方法。
- 属性 - // 随机数 private final Random random = new Random(); // 外层的壳子 private final DefaultMQProducer defaultMQProducer; // 之前 intro 里面提过,Producer 本地是有缓存 topic 的路由信息的,就是存在这里。 //本地路由信息表。key: Topic, Value: TopicPublishInfo private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); // 存 hook 的 list private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>(); // hook ,支持两个阶段,request 之前 和 request 之后 private final RPCHook rpcHook; // 异步发送时用的 workQueue 和 Executor private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; // 定时任务的 Executor private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "RequestHouseKeepingService")); // 这俩也是一套 workQueue + Executor。 protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; // 状态字段 代表 instance 刚刚创建 private ServiceState serviceState = ServiceState.CREATE_JUST; // instance private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); // 压缩级别 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); // 失败策略 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private ExecutorService asyncSenderExecutor;
- 构造方法 - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; // 下面就是创建 workQueue 和 ThreadPoolExecutor this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } }); }
- tryToFindTopicPublishInfo 方法 - private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 从本地路由信息表里,根据 topic 取出对应的本地路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { // 如果 topic 的路由信息不存在 // 保护代码 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 从 NameServer 上拉数据,更新本地路由信息表以及一系列相关信息。 // 这个方法是线程安全的。 这个展开也超长,有兴趣可以自己看一下 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 更新完本地路由信息表后重新取出 topic 的路由信息 topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 路由信息存在,则直接返回。 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 还不存在,再次去 broker 更新 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
- sendKernelImpl - 这里就是前面说的 producer.send() 方法 - // 这个可就太长了,只贴 switch 这段 switch (communicationMode) { case ASYNC: // 异步 Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } // 超时判断 long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 同步发送消息。 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }- 有兴趣的话可以从 - this.mQClientFactory.getMQClientAPIImpl()方法继续往下看, 再下面一层就是 netty 层了,如何封装 request/response,header/body,api 怎么写什么的。 最终你应该能找到对- NettyRemotingAbstract::invokeSyncImpl()方法的调用,这个方法 里就能看到熟悉的- channel.writeAndFlush(request)。
总结
本文以跟随 producer.send() 为主线,通过对源码进行跟踪来了解 producer 发送消息背后的整体逻辑和流程。
PS. RocketMQ 的源码是真的很难读,后面的篇幅会进一步省略一些与主线逻辑关系不大的代码片段。