RocketMQ 源码分析-发送消息
RocketMQ 源码分析-发送消息
这里开始详细聊聊 RocketMQ 发送消息的各种细节。
前面在简介里面说了,从使用者的角度,Producer 看起来就是直接往 Topic 里面塞消息。而实际上呢,要先去 Namesrv 找到 Topic 的路由信息,然后才能确定 Topic 是由哪个 Broker 管理。
发送消息支持三种方式,同步、异步、单向,区别很简单不再赘述,下文以同步为例进行分析。
前置要求
这里就假设读者已经顺利完成了 quick start 中的内容,收发消息已经全部成功。
源码分析
整个过程以描述+代码注释的方式进行。
从发送消息开始
下面贴了一段简单的发送消息的代码,逻辑很简单,先创建并启动 producer
, 然后发送消息。
// 创建 Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start(); // 启动 - 这是个 netty client
for (int i = 0; i < 128; i++)
try {
{ // 构造消息
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,这里是以同步的方式拿到返回结果。
// 异步则是通过 callback
// 单向则不关心结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown(); // 关闭
确实非常简单对吧,接下来进入 producer.start();
方法。
producer.start()
producer
是 DefaultMQProducer
的实例,于是我们来看 DefaultMQProducer
是啥。
DefaultMQProducer
分析一个类就跟我们自己写一个类的流程差不多,先看他的属性和构造方法,然后再去看他的功能,也就是提供了哪些方法。
属性
源码中这部分是有注释的,可以先自己过一遍源码。
protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 重试的响应码 private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList( ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT )); // 生产者组,这个概念后面细说 private String producerGroup; // 这就是自动创建主题机制的那个 Topic private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; // 前面 Namesrv 里提过,一个 Broker 默认情况下会为每个 topic 创建 4个 readQueue 和 4个 writeQueue。 private volatile int defaultTopicQueueNums = 4; private int sendMsgTimeout = 3000; // 这个是压缩消息的阈值,Message.body 的大小超过 4k 就进行压缩 private int compressMsgBodyOverHowmuch = 1024 * 4; // 发送消息失败会重试两次(总共最多发3次) 后面细说 private int retryTimesWhenSendFailed = 2; // 这是发送异步消息失败也是重试两次 // 源码注释里写了,这里也先提一下,异步消息失败的重试机制可能会造成消息重复发送。 private int retryTimesWhenSendAsyncFailed = 2; // 这个是消息发送失败时,是否切换另外的 broker private boolean retryAnotherBrokerWhenNotStoreOK = false; // 消息大小的上限 aka 4M private int maxMessageSize = 1024 * 1024 * 4; // 这个先无视 private TraceDispatcher traceDispatcher = null;
构造方法
数了一下...有10个构造方法,就不全贴出来了,只看这一个就行。
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); }
可以看出来
DefaultMQProducer
其实就是对DefaultMQProducerImpl
做了一层包装,他的其他方法就不列出来了,可以自己观察一下,这里我们直接去看DefaultMQProducer::start()
。
DefaultMQProducer::start()
// DefaultMQProducer::start()
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 实际实现启动的是 DefaultMQProducerImpl 类
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
接下来进入 DefaultMQProducerImpl::start()
。
DefaultMQProducerImpl::start()
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
// 起手就是个 switch
switch (this.serviceState) {
// serviceState 属性默认是 CREATE_JUST
// 而无论是 DefaultMQProducerImpl 或者 DefaultMQProducer 的构造方法中都没有修改该属性的操作,所以我们的 producer.start() 肯定会进入 CREATE_JUST 分支。
case CREATE_JUST:
// 进来之后直接先标记为 “启动失败” 的状态。
this.serviceState = ServiceState.START_FAILED;
/* --- 校验开始 --- */
// 然后开始检查配置,实际就是检查 producerGroup 是否存在,是不是默认的 "DEFAULT_PRODUCER" ,不符合要求就抛个异常提醒用户修改。
// 比如 new DefaultMQProducer();时没有自定义 producerGroup 时,rocketmq 会默认你的 producerGroup 为 “DEFAULT_PRODUCER”, 之后代码运行到这里就抛异常了
this.checkConfig();
// 这段判定的作用跟上面的 this.checkConfig(); 其实差不多,也是提醒你要自定义 producerGroup。
// 只要 ProducerGroup 不等于 "CLIENT_INNER_PRODUCER"
// 这个默认值是走 MQClientInstance 初始化 producer 用的。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 这里要做的事情就是 如果 InstanceName 为 "DEFAULT" 就改成 Pid#TimeStamp 的形式。
// 目的是为了区分不同的 producer 实例
this.defaultMQProducer.changeInstanceNameToPID();
}
/* --- 校验结束 --- */
/**
创建 MQ客户端的实例,这里简单说一下 MQClientInstance 是个啥,本质上就是 netty 客户端 + RocketMQ 自身的需求实现。
注意这里说的客户端/client 的概念是不区分 Consumer 和 Producer ,这两种角色都看作客户端。
MQClientManager ,他里面维护了一个 clientID 和 MQClientInstance 一一对应的 kv 结构,这里为了满足并发的需要用的是 ConcurrentHashMap。
*/
/*
然后我们来看代码, MQClientManager.getInstance() 返回的是 MQClientManager 的
getOrCreateMQClientInstance() 返回的是 MQClientInstance 。
也就是前面说的 netty 客户端。
getOrCreateMQClientInstance 方法的逻辑是 key(clientId) 对应的 value(MQClientInstance) 存在则直接返回,不存在则创建后返回。
*/
// 创建 instance
// 无视掉 mQClientFactory 这个变量名,就是个 instance.
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
/*
MQClientInstance 创建好了,但是 Instance 的角色还不明确,前面说了,它可以是生产者也可以是消费者,这里 registerProducer 就是给 MQClientInstance 明确身份
registerProducer() 方法就是把刚创建的这个 MQClientInstance 注册到其内部维护的 producerTable 中
producerTable 也是 ConcurrentHashMap,他的 key 就是创建 Producer 时我们自定义的 ProducerGroup
一个 group 只对应一个 instance, 从这里我们就可以看出 ProducerGroup 这个 group 概念是基于集群的.
之后消费环节的 ConsumerGroup 同理
*/
// 把 instance 注册到 producerTable,明确 instance 角色
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) { // ProducerGroup 和 MQClientInstance 一一对应,重复就会抛下面的异常
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 把 producer 的信息存入本地的 Topic 路由信息表中,并且初始化该 topic 的路由信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start(); // 最终可以看出,producer.start() 其实就是启动 netty 客户端。
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 将 ServiceState 修改为 运行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// producer 发送首次心跳请求。
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 启动定时任务,这个定时任务是扫描并移除 requestFutureTable 中的过期请求
// 初始启动延迟3秒,之后一秒执行一次
this.startScheduledTask();
}
简单总结一下,创建 producer 并执行 producer.start() 的过程,实际上就是创建了一个自定义的 netty 客户端,标记为 Producer 角色,并且初始化其对应的 groupTable 和本地路由信息表,然后启动该客户端。客户端启动后发出首次心跳,并开启定时任务这个流程。
producer.send(Message msg)
搞清楚了什么是 producer
和 producer.start()
都做了什么之后,接下来我们来看 producer.send()
,也就是发送消息的流程。
DefaultMQProducer::send(Message msg)
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验 msg
Validators.checkMessage(msg, this);
// 完善 Topic 信息
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
依然是包装。
DefaultMQProducerImpl::send(Message msg)
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 可以看到我们直接 producer.send(msg) 时,默认为同步的方式。
// CommunicationMode.SYNC
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
主要看 sendDefaultImpl()
方法:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode, // 同步、异步、单向由该参数决定
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验
// 检查 ServiceState 是不是 RUNNING
this.makeSureStateOK();
// 校验 msg 的合法性,是不是为空啊,是不是超过4m最大小限制等等。
// 跟 defaultMQProducer::send 里面那行校验是一样的是。
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
// 用来算超时的各种时间戳变量
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// tryToFindTopicPublishInfo 方法就是 Intro 中提到的 producer 发消息时的整个路由流程
// 前面看 producer.start() 时,结尾部分不是创建了一个空的路由信息么 new TopicPublishInfo()
// 这个空的 topicPublishInfo 也在这里完成真正的初始化。
// 后面看 DefaultMQProducerImpl 时具体展开。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 本地路由表不为空 才继续执行
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// timesTotal就是一条消息的发送总次数,同步模式下 最多3次,1是正常发送那一次,剩下的两次是默认的重试次数;
// 非同步模式为1次,非同步模式就是单向和异步,单向自然就是一次,异步其实是可以重试的,也就是不止一次。
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
/* 下面是发送流程 */
// brokersSent 发送次数
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++)
// mq 里保存了 topic, brokerName, queueId 三项属性。
String lastBrokerName = null == mq ? null : mq.getBrokerName();
/*
Broker 会为 Topic 创建多个 Queue,默认为 4个 readQueue 和 4个 writeQueue
selectOneMessageQueue()这个方法这里不过多展开了,可以自己看一下。
*/
// 根据路由信息找到用哪个 Queue,放在什么位置。
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 这里还是先不展开说明,概括一下就是 ENCODE 好数据包,然后执行 netty 的 writeAndFlush
// sendKernelImpl 再下一层的代码不明白的可以去看一眼 Netty,过一遍 netty 的 quick start 再回来看毕竟好懂。
// 发送消息并拿到返回结果。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 不同模式下返回结果的处理
switch (communicationMode) {
case ASYNC: // 异步获取结果是利用 callback
return null;
case ONEWAY: // 单向发送不需要返回结果
return null;
case SYNC: // 这里是我们要看的同步
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 重试逻辑,这个变量名 =_=
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
// 后面都是异常的处理啥的
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
到此 producer.send()
的源码分析告一段落。
前面说过,发送消息支持三种模式,同步、异步、单向。 从前面的代码中我们可以发现,无论那种,最终具体实现都是 defaultMQProducerImpl::sendDefaultImpl()
里完成的。同步和异步的区别主要是入参的不同以及异步需要 callback 获取返回结果。而单向跟同步一个逻辑,只不过返回值被无视了。
文章最后稍微再补充一些关于 defaultMQProducerImpl
的内容。
发送消息这里还有一个可以看一下,批量发送消息,也就是 DefaultMQProducer::batch()
方法.
DefaultMQProducer::batch()`
批量其实就是给单次发送的处理流程外面加了层循环。
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
// MessageBatch 内部结构就是 List<Message> messages;
MessageBatch msgBatch;
try {
// 这里 generateFromList 其实主要是对消息集合进行过滤
// 不满足批量发送条件会直接抛异常 注意跟后面的 Validators.checkMessage 进行区分
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
// 校验消息合法性 非批量发送也有这一步
Validators.checkMessage(message, this);
// 在 Message.properties 里添加一组 kv 参数
// UNIQ_KEY:具体的key值,就通过 ip + host + 类加载的 hashcode 啥的有兴趣自己去看
MessageClientIDSetter.setUniqID(message);
// 设置 topic,非批量发送也有这一步
message.setTopic(withNamespace(message.getTopic()));
}
// 这里 msgBatch.encode() 就是对整个 messageList 进行 encode。
// 具体细节下面展开说
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}
通过代码可以看出batch()
方法的逻辑就是把所有 message 加好对应的标记,之后批量 encode,然后存放到msgBatch.setBody
等待发送。
// msgBatch.encode() 就是调用 MessageDecoder.encodeMessages()
public static byte[] encodeMessages(List<Message> messages) {
//TODO refactor, accumulate in one buffer, avoid copies
// 那个 TODO 是源码里的,无视掉就好了
// 创建一个 buffer list 用来存 encode 之后的 message。
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
// 目前还不知道所有 message 加起来一共占用多大内存,所以初始化为0.
int allSize = 0;
// 这个循环,首先对 msg 逐条 encode。
// 然后把 encode 完的 msg 存到 buffer list里
// 之后累加计算所有 msg 最终需要多大的 buffer / 也就是内存。
for (Message message : messages) {
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
// 按照计算出的 size 分配内存
byte[] allBytes = new byte[allSize];
int pos = 0;
/* 通过 System.arraycopy 来将前面 buff list,
也就是 encodedMessages,中的数据复制到 allBytes 中,
这个 allBytes 就是之后 msgBatch.setBody()的参数。
*/
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
DefaultMQProducerImpl
接下来我们来看看 DefaultMQProducerImpl
类提供的各种具体实现,不过还是老样子,先看属性和构造方法。
属性
// 随机数 private final Random random = new Random(); // 外层的壳子 private final DefaultMQProducer defaultMQProducer; // 之前 intro 里面提过,Producer 本地是有缓存 topic 的路由信息的,就是存在这里。 //本地路由信息表。key: Topic, Value: TopicPublishInfo private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); // 存 hook 的 list private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>(); // hook ,支持两个阶段,request 之前 和 request 之后 private final RPCHook rpcHook; // 异步发送时用的 workQueue 和 Executor private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; // 定时任务的 Executor private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "RequestHouseKeepingService")); // 这俩也是一套 workQueue + Executor。 protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; // 状态字段 代表 instance 刚刚创建 private ServiceState serviceState = ServiceState.CREATE_JUST; // instance private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); // 压缩级别 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); // 失败策略 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private ExecutorService asyncSenderExecutor;
构造方法
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; // 下面就是创建 workQueue 和 ThreadPoolExecutor this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } }); }
tryToFindTopicPublishInfo 方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 从本地路由信息表里,根据 topic 取出对应的本地路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { // 如果 topic 的路由信息不存在 // 保护代码 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 从 NameServer 上拉数据,更新本地路由信息表以及一系列相关信息。 // 这个方法是线程安全的。 这个展开也超长,有兴趣可以自己看一下 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 更新完本地路由信息表后重新取出 topic 的路由信息 topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 路由信息存在,则直接返回。 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 还不存在,再次去 broker 更新 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
sendKernelImpl
这里就是前面说的 producer.send() 方法
// 这个可就太长了,只贴 switch 这段 switch (communicationMode) { case ASYNC: // 异步 Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } // 超时判断 long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 同步发送消息。 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }
有兴趣的话可以从
this.mQClientFactory.getMQClientAPIImpl()
方法继续往下看, 再下面一层就是 netty 层了,如何封装 request/response,header/body,api 怎么写什么的。 最终你应该能找到对NettyRemotingAbstract::invokeSyncImpl()
方法的调用,这个方法 里就能看到熟悉的channel.writeAndFlush(request)
。
总结
本文以跟随 producer.send()
为主线,通过对源码进行跟踪来了解 producer 发送消息背后的整体逻辑和流程。
PS. RocketMQ 的源码是真的很难读,后面的篇幅会进一步省略一些与主线逻辑关系不大的代码片段。