25-并发消费原理
| 版本 | 内容 | 时间 |
|---|---|---|
| V1 | 新建 | 2023年06月26日23:58:48 |
回顾消息拉取流程
消费者消费 topic 的某个 queue,就会发送拉取消息 RPC 到 broker,消费者拿到 broker 返回的消息就会去消费了,这里看下从 broker 正常拉取到消息的分支。
org.apache.rocketmq.client.consumer.PullCallback#onSuccess 方法里的逻辑
// 获取本次拉取消息的第一条消息的 offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 将服务器拉取的消息 list 加入到消费者本地的该 queue 的 processQueue 中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交 "消费任务"
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
// 将更新过 pullRequest.nextOffset 字段的 pullRequest 对象,再次放到 pullMessageService 的队列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}这部分主要的逻辑就是将 broker 返回的所有消息存到本地消费快照队列 ProcessQueue 中,然后交给 ConsumeMessageService 消费服务去处理。
RocketMQ 中 ConsumeMessageService 分为两种,一种是并发消费 ConsumeMessageConcurrentlyService ,一种是顺序消费 ConsumeMessageOrderlyService。本篇分析并发消费的原理。
并发消费服务简要分析
字段分析
ConsumeMessageConcurrentlyService 类
// 消费者实现对象,推模式实现类
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 消费者门面对象 就是一个配置
private final DefaultMQPushConsumer defaultMQPushConsumer;
// 消息监听器(消息处理的逻辑再这里封装,这个 messageListener 由开发者自己实现,并注册到 defaultMQPushConsumer)
private final MessageListenerConcurrently messageListener;
// 消费任务队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费任务线程池
private final ThreadPoolExecutor consumeExecutor;
// 消费者组
private final String consumerGroup;
// 调度线程池,用于延迟提交消费任务
private final ScheduledExecutorService scheduledExecutorService;
// 清理过期消息的任务的 线程池,15 分钟一次
// 从服务器拉取到消息后,回调 pullBack 方法,先将消息放入 processQueue 中,然后把消息提交到消费线程池中执行
private final ScheduledExecutorService cleanExpireMsgExecutors;- 主要是
MessageListenerConcurrently messageListener这个字段,这个是我们真正业务程序员需要实现的逻辑,也就是我们想要怎么消费消息; ThreadPoolExecutor consumeExecutor:因为是并发消费,所以是使用线程池异步提交任务去消费消息的;- 其他的字段遇到时具体分析。
并发消费原理
从 broker 拉取到的消息,通过一些处理后会调用 ConsumeMessageConcurrentlyService#submitConsumeRequest 方法去将消息封装为一个 ConsumeRequest 对象,然后分批提交到线程池中去执行,因为是提交到线程池异步执行,这也就是并发消费的命名的由来。
看一下流程图:
TODO-KWOK
并发消息源码分析
封装任务到消费线程池
ConsumeMessageConcurrentlyService#submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 此参数控制一个消费任务 可消费的消息数量,默认 1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//
if (msgs.size() <= consumeBatchSize) {
// case:msgs内部的消息数 小于 consumeBatchSize,直接封装一个消费任务提交到消费线程池即可
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 分页提交
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// ....... 省略异常重试的处理 ......
}
}
}
}先分析下这个方法的入参:
List<MessageExt> msgs:从 broker 拉取到的一些消息,本次需要消费这些消息;ProcessQueue processQueue:此次消费的本地消费快照队列;MessageQueue messageQueue:此次消费的消息的队列信息;boolean dispatchToConsume:并发消费这里并没有使用,忽略这个;
首先获取 consumeBatchSize 的值,默认值是 1,表示一个消费任务 ConsumeRequest 中只有 1 个消息。根据 consumeBatchSize 的值分为两种情况:
- 如果从 broker 拉取的消息小于等于 consumeBatchSize,那么就封装一个 ConsumeRequest 消费任务去线程池;
- 如果从 broker 拉取的消息大于 consumeBatchSize,那么就分批提交多个 ConsumeRequest 消费任务区线程池;
前面说了半天的 ConsumeRequest 对象,这个对象其实很简单,就是某次消费任务的信息:
class ConsumeRequest implements Runnable {
// 分配该消费任务的消息
private final List<MessageExt> msgs;
// 消息的处理队列
private final ProcessQueue processQueue;
// 消息队列
private final MessageQueue messageQueue;
// ...... 省略 ......
}线程池消费消息
校验本地消费队列状态
// 条件成立说明该 queue 经过 rbl 分配到其他 consumer 了,当前 consumer 不需要再去消费该 queue 的消息了
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}处理回退的消息
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());这里先不分析消息回退的逻辑,下一篇详细分析。
执行前置消息钩子函数
// 消息消费前的钩子函数
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}如果有钩子函数就执行,没有就不执行了,扩展用的,不是核心逻辑。
调用业务方具体消费函数
// 消费开始时间
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
// 设置每条消息的消费开始时间
// 清理过期消息的任务,会检查这个属性,判断是否消费超时
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 参数 1:msgs,但是这个list不能被修改,添加,删除
// 参数 2:消费上下文,主要是控制消息消费失败时,消费延迟级别
// 返回该批消息的消费结果,即 CONSUME_SUCCESS (消费成功),CONSUME_LATER (需要重新消费)
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
// ...... 异常处理 ......
}这里的 listener 就是我们业务方程序员需要实现的具体消费逻辑了,MessageListenerConcurrently、
执行后置消费钩子函数
// 消息后的钩子函数
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}如果有钩子函数就执行,没有就不执行了,扩展用的,不是核心逻辑。
消息消费后处理
if (!processQueue.isDropped()) {
// 正常走这里
// 处理消费结果
// 1.消费成功需要将 msgs从 pq 移除
// 2.消费失败,需要将消费失败的消息回退给服务器,并且将回退失败的消息(会将回退失败的消息从当前任务移除)再次提交消费任务,最后也就将 CR.msgs 从 pq 移除
// 3.更新消费进度
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}主要是 ConsumeMessageConcurrentlyService#processConsumeResult 方法,这个方法很简单,就不贴源码了,就简单分析下做了什么:
判断消息是否消费正常:
- 当消息消费失败时,需要将消息回退到 broker,具体消息回退下一篇分析;
- 从 ProcessQueue 中删除已经消费的消息,无论是否成功消费都会移除;
- 更新 Consumer 端的本地消费进度;