RocketMQ之DefaultPushConsumer

DefaultMQPushConsumer消息链路

  1. DefaultMQpushConsumer#start方法调用DefaultMQpushConsumerImpl#start方法,接着内部调用MQClientInstance#start方法,接着调用RebalanceService#start方法。

  2. RebalanceService#start方法开启一个线程,执行本类中的runnable#run方法。run方法中调用MQClientInstance#doRebalance方法,这个方法阻塞20秒,循环调用。

  3. MQClientInstance#doRebalance方法中接着调用DefaultMQpushConsumerImpl#doRebalance方法。

  4. DefaultMQpushConsumerImpl#doRebalance中接着调用RebalanceImpl#doRebalance方法。

  5. RebalanceImpl#doRebalance方法接着调用本类方法rebalanceByTopic,根据负载均衡分配策略获取当前消费者组对应的messageQueues,接着调用本类方法updateProcessQueueTableInRebalance。
    在这里插入图片描述

  6. RebalanceImpl#updateProcessQueueTableInRebalance方法中。①内部属性processQueueTable维护messageQueue与processQueue的关系。②调用broker获取messagQueue的消息处理偏移量。每个队列封装参数为PullRequest(里面包含消费者组名称,下一个消息的偏移量,messageQueue,processQueue),调用本地方法dispatchPullRequest传入参数PullRequests。

  7. RebalanceImpl#dispatchPullRequest方法。这是一个抽象方法,实现在RebanlancePushImpl中。在RebanlancePushImpl#dispatchPullRequest中,for循环遍历PullRequests,调用DefaultMQpushConsumerImpl#executePullRequestImmediately(pullRequest)方法执行。
    在这里插入图片描述

  8. DefaultMQpushConsumerImpl#executePullRequestImmediately方法中调用MQClientInstance的属性对象pullMessageService的方法executePullRequestImmediately,将pullReuqest存到到pullMessageServcie的内部属性队列pullMessageQueue中,这是一个LinkedBlockingQueue队列。
    在这里插入图片描述在这里插入图片描述

  9. PullMessageServcie#start方法,开启线程执行本类中的run方法。run方法中①通过take获取pullMessageQueue中的pullRequest,②调用本地的pullMessage方法进行处理。

在这里插入图片描述
在这里插入图片描述
10. PullMessageServcie#pullMessage方法,通过MQClientInstance获取到consumer,我这里用的是DefaultMQpushConsumerImpl,调用DefaultMQpushConsumerImpl#pullMessage(pullRequest)方法。
11. DefaultMQpushConsumerImpl#pullMessage方法。①获取pullRequest中的messageQueue的主题关联的订阅者。②定义回调方法PullCallback。回到方法中,如果从broker拉取消息成功,会将消息放入到processQueue的msgTreeMap中③调用pullAPIWrapper#pullKernelImpl方法拉取消息。
在这里插入图片描述
12. PullAPIWrapper#pullKernelImpl方法,调用MQClientInstance的属性类MQClientAPIImpl的pullMessage方法。
在这里插入图片描述
13. MQClientAPIImpl#pullMessage方法。调用remotingCLient发送netty消息到broker。拿到消息后,调用步骤11中定义的PullCallback方法。
14. DefaultMQpushConsumerImpl#PullCallback方法,将消息封装到ConsumeRequest,提交到MessageListenerConcurrently的任务线程池中consumerExecutor。
在这里插入图片描述
15. ConsumeRequest本身实现runnable接口,实现run方法。ConsumeRequest是MessageListenerConcurrently的内部类。①调用消费者的监听器,将消息传入。②根据监听器类处理响应status,处理响应结果。
在这里插入图片描述 在这里插入图片描述在这里插入图片描述
在这里插入图片描述

DefaultMQPushConsumer消息处理类

  消息处理类ConsumeMessageService。
   DefaultMQPushConsumer–>DefaultMQPushConsumerImpl#start(),根据注册的监听器类型实例化ConsumeMessageService。
  (1) 注册的监听器是:MessageListenerOrderly,对应实例化的消息处理类:ConsumeMessageOrderlyService
   (2) 注册的监听器是:MessageListenerConcurrently,对应实例化的消息处理类: ConsumeMessageConcurrentlyService
  ConsumeMessageConcurrentlyService:
  属性中包含了消费者配置的监听类messageListener,defaultMQPushConsumer,defaultMQPushConsumerImpl,consumeRequestQueue(请求阻塞队列)
。初始化了三个线程池。
  (1)消息处理线程池consumerExecutor,线程池的核心线程,最大线程通过参数配置,setConsumeThreadMin,setConsumeThreadMax,主要处理正常接收到的消息。
  (2) 还有两个定时执行线程池,一个用来执行推迟消费的消息,一个用来定期清理超时消息(15分钟)。

DefaultMQPushConsumer消息偏移量

  消费者有两种模式,集群模式和广播模式。

广播模式

  广播模式,偏移量的处理类offsiteStore的实现类LocalFileOffsetStore。广播模式的偏移量是保存到本地。可以通过参数【rocketmq.client.localOffsetStoreDir】进行配置。
  OffsetStore的load方法读取上面的文件,如果读取失败或者文件内容为空,就会读取备份文件,路径是上面的文件名后面加.bak。这个load方法读取这个json文件,然后把内容读取到LocalFileOffsetStore类的offsetTable(ConcurrentHashMap)这个数据结构。
  在拉取消息的时候,首先会封装PullRequest请求,PullRequest中nextOffset参数需要从offsetTable中获取。
  MQClientInstance的start方法中会开启一个定时任务,默认时间是5秒,每5秒执行一次持久化,将offsetTable持久化到 本地文件。写文件时有以下几步:
1. 首先把内容写入到offsets.json.tmp文件。
2. offsets.json内容备份到offsets.json.bak。
3. 删除offsets.json文件。
4. 把offsets.json.tmp改名为offsets.json。

   总结:广播模式下,偏移量保存在消费者本地服务器。
在这里插入图片描述

集群模式

   集群模式,偏移量的处理类offsiteStore的实现类RemoteBrokerOffsetStore
   集群模式下,偏移量是从Broker端获取,所以客户端RemoteBrokerOffsetStore中的load方法没有内容,是一个空方法。
在这里插入图片描述
   集群模式下,偏移量保存在Broker服务器上。默认路径在/home/${user}/store/config/consumerOffset.json
   集群模式下,消费者端也会定时持久化offsetStore,不过集群模式下这个方法会上报消费点到Broker服务上。Broker服务接收到请求会保存到本地的offsetTable本地缓存中,Broker服务启动也会开启一个定时任务,默认每5秒持久化offsetTable到磁盘文件。
   RebalanceImpl的updateProcessQueueTableInRebalance在拉取消息的时候。都需要先计算nextOffset,这个计算方法是一个抽象方法,由其子类实现。我们就看看push里面的实现RebalancePushImpl里面的实现。 在这里插入图片描述

  1. 首先获取DefaultMQPushConsumer里面的配置consumerFromWhere
  2. 获取对应的offsetStore。我们用的集群模式,这里是RemoteBrokerOffsetStore
  3. 通过不同的类型,走不通的逻辑获取偏移量。我这里配置的是CONSUME_FROM_MAX_OFFSET,是通过RemoteBrokerOffsetStore的readOffset获取偏移量。参数类型是ReadOffsetType.READ_FROM_STORE
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
    long result = -1;
    // 1. 获取DefaultMQPushConsumer里面的配置consumerFromWhere。
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getCons

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/633647.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

P6【知识点】【数据结构】【树tree】C++版

树是由一个集合以及在该集合上定义的一种关系构成的,集合中的元素称为树的结点,所定义的关系称为父子关系。父子关系在树的结点之间建立了一个层次结构,在这种层次结构中有一个结点具有特殊的地位,这个结点称为该树的根结点。 二叉…

数据库|基于T-SQL创建数据库

哈喽,你好啊,我是雷工! SQL Server用于操作数据库的编程语言为Transaction-SQL,简称T-SQL。 本节学习基于T-SQL创建数据库。以下为学习笔记。 01 打开新建查询 首先连接上数据库,点击【新建查询】打开新建查询窗口, …

如何在go项目中实现发送邮箱验证码、邮箱+验证码登录

前期准备 GoLand :2024.1.1 下载官网:https://www.jetbrains.com/zh-cn/go/download/other.html Postman: 下载官网:https://www.postman.com/downloads/ 效果图(使用Postman) Google: QQ: And …

java 8--Lambda表达式,Stream流

目录 Lambda表达式 Lambda表达式的由来 Lambda表达式简介 Lambda表达式的结构 Stream流 什么是Stream流? 什么是流呢? Stream流操作 中间操作 终端操作 Lambda表达式 Lambda表达式的由来 Java是面向对象语言,除了部分简单数据类型…

SpringBoot——整合MyBatis

目录 MyBatis 项目总结 1、创建SQL表 2、新建一个SpringBoot项目 3、pom.xml添加依赖 4、application.properties配置文件 5、User实体类 6、UserMapper接口 7、UserMapper.xml映射文件 8、UserController控制器 9、SpringBootMyBatisApplication启动类 10、使用Po…

关于如何创建一个可配置的 SpringBoot Web 项目的全局异常处理

前情概要 这个问题其实困扰了我一周时间,一周都在 Google 上旅游,我要如何动态的设置 RestControllerAdvice 里面的 basePackages 以及 baseClasses 的值呢?经过一周的时间寻求无果之后打算决定放弃的我终于找到了一些关键的线索。 当然在此…

反射的基本知识

基本概念 反射是java在运行过程中的自我观察能力,通过class constructor field method 四个方法来获取一个类的各个组成部分。 反射是在运行状态中对于任意一个类,都能知道这个类的所有属性和方法;对于任意一个对象都能调用它的任意一个方法…

《MySQL是怎样运行的》快速查询秘籍——B+树索引

一.引出索引 前面一章我们说出了数据页的结构,但是如果我们要查找某一条记录的话,怎么办呢? 我们前面知道页与页之间是一个双向链表实现的,我们要找的话,是不是要按照这个链表一个一个找下去,然后找到&am…

数据链路层简单介绍

mac地址(物理地址) mac地址和ip地址,目的都是为了区分网络上的不同设备的,在最开始的时候,mac地址和ip地址是两伙人,独立各自提出的,ip地址是4个字节(早都不够用了)&…

【个人商业画布】你有思考过把自己当成一家公司来经营吗?

商业模式画布(Business Model Canvas),是亚历山大奥斯特瓦德在《商业模式新生代》中提出的一种用于描述商业模式、可视化商业模式、评估商业模式以及改变商业模式的通用语言。它由9个模块构成,帮助创业者理清为“细分客户提供独有价值”,从而…

PersonalLLM——探索LLM是否能根据五大人格特质重新塑造一个新的角色?

1.概述 近年来,大型语言模型(LLMs),例如ChatGPT,致力于构建能够辅助人类的个性化人工智能代理,这些代理以进行类似人类的对话为重点。在学术领域,尤其是社会科学中,一些研究报告已经…

溪谷联运SDK功能全面解析

近期,备受用户关注的手游联运10.0.0版本上线了,不少用户也选择了版本更新,其中也再次迎来了SDK的更新。溪谷软件和大家一起盘点一下溪谷SDK的功能都有哪些吧。 一、溪谷SDK具有完整的运营功能和高度扩展性 1.登录:登录是SDK最基础…

简述MyBatis中#{}引用和${}引用的区别

各位大佬光临寒舍,希望各位能赏脸给个三连,谢谢各位大佬了!!! 目录 1.有无预编译 优点 缺点 2.SQL执行的快慢 3.能否被SQL注入 4.参数输入方式 5.总结 1.有无预编译 #{}是有预编译的而${}是没有预编译的&…

OceanBase集群如何进行OCP的替换

有OceanBase社区版的用户提出替换 OCP 管控平台的需求。举例来说,之前的OCP平台采用单节点,然而随着OceanBase集群的陆续上线和数量的不断增多,担心单节点的OCP可能面临故障风险,而丧失对OceanBase集群的管控能力。另此外&#xf…

创建vue工程、Vue项目的目录结构、Vue项目-启动、API风格

环境准备 介绍:create-vue是Vue官方提供的最新的脚手架工具,用于快速生成一个工程化的Vue项目create-vue提供如下功能: 统一的目录结构 本地调试 热部署 单元测试 集成打包依赖环境:NodeJS 安装NodeJS 一、 创建vue工程 npm 类…

以Linux为例了解线程

我最近开了几个专栏,诚信互三! > |||《算法专栏》::刷题教程来自网站《代码随想录》。||| > |||《C专栏》::记录我学习C的经历,看完你一定会有收获。||| > |||《Linux专栏》&#xff1…

IO系列(八) -浅析NIO工作原理

一、简介 现在使用 NIO 的场景越来越多,很多网上的技术框架或多或少的使用 NIO 技术,譬如 Tomcat、Jetty、Netty,学习和掌握 NIO 技术已经不是一个 Java 攻城狮的加分技能,而是一个必备技能。 那什么是 NIO 呢? NIO…

第06章 数据加载、存储与文件格式

以下内容参考自https://github.com/iamseancheney/python_for_data_analysis_2nd_chinese_version/blob/master/%E7%AC%AC05%E7%AB%A0%20pandas%E5%85%A5%E9%97%A8.md 《利用Python进行数据分析第2版》 用以学习和记录。 输入输出通常可以划分为几个大类:读取文本文…

深海奥秘:鳐鱼肽的肌肤之旅

深海,一个神秘又充满生命力的世界,总是带给我们无尽的惊喜。鳐鱼,又被称为“魔鬼鱼”,它的皮肤中含有一种特殊的肽,这种肽不仅分子量适中,易于人体吸收,还具有极高的消化率和生物利用度。来自北…

科技引领乡村振兴新潮流:运用现代信息技术手段,提升农业生产和乡村管理效率,打造智慧化、现代化的美丽乡村

一、引言 随着科技的不断进步,现代信息技术已经渗透到社会的各个领域,成为推动社会发展的重要力量。在乡村振兴战略的背景下,科技的力量同样不容忽视。本文旨在探讨如何运用现代信息技术手段,提升农业生产和乡村管理效率&#xf…