10-生产者启动流程
| 版本 | 内容 | 时间 |
|---|---|---|
| V1 | 新建 | 2022年08月04日22:58:51 |
| V2 | 重构 | 2023年06月11日23:20:07 |
生产者发送消息例子
org.apache.rocketmq.example.quickstart.Producer
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// ...... 省略发送消息部分 ......
producer.shutdown();
}
}可以看到主要是创建一个 DefaultMQProducer 对象,设置一些属性,并将其启动。
生产者启动流程
DefaultMQProducer 构造方法
在官方的示例中,首先创建一个 DefaultMQProducer 对象。
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
// 创建生产者实现对象 DefaultMQProducerImpl
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}前面说过 DefaultMQProducer 里面的方法都是委托给 DefaultMQProducerImpl 对象的,随意继续跟进 DefaultMQProducerImpl 构造方法。
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 创建异步发送消息的线程池的任务队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 创建缺省的异步消息线程池,核心线程和最大线程个数都是 CPU 的核心数
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}DefaultMQProducer#start
注意本次分析生产者的启动流程并不会详细分析每个步骤做了什么,我们只需要知晓整体流程就行了,不必拘于细节。
@Override
public void start() throws MQClientException {
// 重新设置生产者组名,
// 规则:如果传递命名空间,则 namespace%group
this.setProducerGroup(withNamespace(this.producerGroup));
// 启动生产者实现类对象
this.defaultMQProducerImpl.start();
// ...... 省略消息轨迹相关 ......
}关键点是this.defaultMQProducerImpl.start();方法,该方法的流程比较长,逐步分析。
校验生产者组的名称是否合法
this.checkConfig();修改当前生产者的实例 id
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 修改默认的 instanceName
this.defaultMQProducer.changeInstanceNameToPID();
}public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}获取客户端实例对象 MQClientInstance
// 获取当前进程的 RocketMQ 的客户端实例对象, defaultMQProducer 对应的 MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);MQClientInstance 是客户端的核心类。RocketMQ 使用 MQClientManager 来管理 MQClientInstance 类。这里我们只要知道获取了一个 MQClientInstance 实例就行了。MQClientInstance 封装了 RocketMQ 的网络处理 API,是消息生产者、消息消费者与 NameServer、Broker 通信的入口。例如管理本实例中全部生产者与消费者的生产和消费行为。
生产者注册到 MQClientInstance 中
// 将生产者自己注册到 RocketMQ 客户端实例内,观察者模式
// 一个生产者组只会注册一个
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);将当前生产者 defaultMQProducer 注册到刚才的 MQClientInstance 实例中,将当前生产者放到 MQClientInstance 中管理,在 MQClientInstance 中维护了一个 Map,用于存放生产者实例,key 是生产者组,value 是生产者实例;
MQClientInstance#producerTable
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();添加 TBW102 主题信息
// 添加测试用的 topic,就是 TBW102
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());启动 MQClientInstance 实例
if (startFactory) {
// 启动 RocketMQ 客户端的实例对象的核心入口
mQClientFactory.start();
}启动 MQClientInstance 实例,启动 MQClientInstance 实例下一节分析。
客户端向所有的 broker 节点发送一次心跳
// 强制 RocketMQ 客户端实例向已知的 broker 节点发送一次心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();定时任务:处理回执消息太慢的情况
开启一个定时任务,处理回执消息太慢的情况;
MQClientInstance
MQClientInstance 的核心属性
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// 省略部分...
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
// 会在定时任务中定时去 nameserver 拉取数据更新这个哈希表
// broker 物理节点映射表,key 是 brokerName,value 是 Map
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// broker 物理节点版本号映射表,key 是 brokerName,value 是 Map
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
// 单线程的调度线程池,用于执行定时任务
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
// 客户端的协议处理器,用于处理 IO 事件
private final ClientRemotingProcessor clientRemotingProcessor;
// 拉消息服务
private final PullMessageService pullMessageService;
// 消费者负载均衡服务
private final RebalanceService rebalanceService;
// 内部生产者实例,用于处理消费端消息回退
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;- **producerTable:**当前 client 实例的全部生产者的内部实例;
- consumerTable:当前 client 实例的全部消费者的内部实例;
- **adminExtTable:**当前 client 实例的全部管理实例;
- **mQClientAPIImpl:**实现了全部 client 支持的接口;
- **mQAdminImpl:**管理接口的本地实现类;
- **topicRouteTable:**当前生产者、消费者中全部Topic的本地缓存路由信息;
- **scheduledExecutorService:**本地定时任务,比如定期获取当前 Namesrv 地址、定期同步Namesrv信息、定期更新Topic路由信息、定期发送心跳信息给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数(这部分源代码被官方删除了);
- **clientRemotingProcessor:**请求的处理器,用于处理网络 I/O 请求;
- **pullMessageService:**Pull 服务;
- rebalanceService:消费者重新平衡服务。定期执行重新平衡方法 this.mqClientFactory.doRebalance()。这里的 mqClientFactory 就是 MQClientInstance 实例,通过依次调用MQClientInstance中保存的消费者实例的doRebalance()方法,来感知订阅关系的变化、集群变化等,以达到重新平衡。
- consumerStatsManager:消费监控。
MQClientInstance#start
前面讲到了 MQClientInstance#start,所以下面来分析
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 开启客户端网络层
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 启动定时任务的入口
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
// 启动内部生产者,消息回退时使用
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}基本流程如下:
- 开启客户端网络层;
- 启动一大堆定时任务;
- 开启拉消息服务 pullMessageService;(后面分析)
- 开启重平衡服务 rebalanceService;(后面分析)
- 启动内部生产者,消息回退时使用,也会注册到 MQClientInstance#producerTable;
OK,那么我们来看看开启了哪些定时任务:
private void startScheduledTask() {
// 假如 nameserver 地址为空
// 一般在业务代码中已经指定了 NameServer 地址,所以这个定时任务一般不会走
if (null == this.clientConfig.getNamesrvAddr()) {
// 定时任务:获取 nameserver 的地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定时任务1,从 nameserver 更新客户端本地的 topic 的路由信息
// 30 秒
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定时任务2,30 秒
// 1 清除下线的 broker节点
// 2 给所有在线的 broker 发送心跳数据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 检查客户端路由表,将路由表中不包含的 addr 清理掉,
// 如果被清理的 brokerName 下所有的物理节点都没有了,需要将所有的 broker 映射数据删除
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 定时任务3,消费者持久化消费进度,5 秒钟一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 定时任务4,动态调整消费者线程池,一分钟一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}总共有五个,依次分析
- 定时任务 1:获取 NameServer 的地址(通过一个指定的域名,硬编码),不过一般在业务代码中已经指定了 NameServer 的地址,这个定时任务就不会执行了;
- 定时任务 2:从 NameServer 拉取最新的 topic 路由信息,更新本地的路由信息,这个就是路由发现了;
- 定时任务 3:
- 清除下线的 broker 节点;
- 给所有在线的 broker 发送心跳数据;
- 定时任务 4:消费者持久化消费进度;
- 定时任务 5:动态调整消费者线程池;(没啥用了,RocketMQ 把这个代码删除了)