Time @2022/06/23
使用消息队列的好处:
- 异步:实际上整个流程的时间没有减少,但是通过消息队列,使整条业务线划分成了一段一段。我的理解是,和工业生产的流水线原理类似。
- 解耦:如果没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口、或者当我们取消某些业务,我们也得在主系统删除某些接口调用。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,接下来收到消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量。
- 削峰:主业务服务器配置一般比较好,通过消息队列,可以防止下游业务被干掉。
采用了发布订阅模型:
RocketMQ中的消息模型:
RocketMQ中的基本概念:
- Broker:代理服务器,即消息队列服务器。一个Topic可以分布在多个Broker上,一个Broker上也可以配置多个Topic。如果Topic消息量很大,应该给它多配置几个队列(可以提高消费者的并发能力),并且尽量分布在多个broker上。
- Producer:生产者,一般业务系统充当生产者。
- Consumer:消费者,支持push和pull两种方式对消息进行消费。
- NameServer:类似ZooKeeper里面的注册中心(Eureka),主要提供Broker管理和路由信息管理。
四个角色在RockerMQ中的架构如图所示。
- 可以看到在broker中也做了主从部署。Slave定期从Master中同步数据(同步刷盘或者异步刷盘),Master寄了就从Slave中消费,但是Slave不能写入消息!
- 为了保证高可用(HA,High Availablity),NameServer是去中心化的,没有主节点而且没有副本。单个Broker和所有NameServer保持长链接,每隔30sBroker向所有NameServer发出心跳,包含自身Topic信息。
- 在生产者向Broker发送消息时,需要从NameServer中获取路由信息,然后采用轮询方法向每隔消息队列中生产数据以达到负载均衡。
- 消费者通过NameServer获取所有Broker路由信息后,向Broker发送Pull请求来获取消息数据。Consumer有两种方式启动——广播和集群。广播模式下,一条消息会发给同一个消费组中的所有消费者,集群模式下只会发送给一个消费者。
- 一个队列只会被一个消费者消费
顺序消费
- 严格顺序:代价很高,不怎么采用了。
- 普通顺序:将同一语义下的消息放入同一个队列中,简单的hash取模即可实现。
重复消费
- 在客户端可以对用户的操作进行幂等校验。
- 在消息队列中,对消费者实现幂等校验。
分布式事务
事务要么都执行,要么都不执行。
分布式事务要解决事务在不同服务之间的问题。
在RocketMQ中采用了 事务消息+事务反查机制 来解决分布式事务问题。
以此图为例说明:
1.实线1向MQ发送prepare消息(此消息对消费者不可见,因此不会被消费),然后再执行producer本地事务。(只有prepare消息发送成功,本地事务才会执行)。
2.执行producer回调函数executeLocalTransaction(),将本地事务执行状态返回给broker。
本地事务状态:
COMMIT_MESSAGE:将不可见消息变为可见
ROLLBACK_MESSAGE:broker回收步骤1中的不可见消息
UNKNOW或者网络异常:broker调用producer中的回调函数checkLocalTransaction()判断producer本地事务是否正常(一般是通过查询数据库中的数据是否已经持久化)
3.消费者消费消息
4.如果有消费者消费失败,则将失败的消息传给broker,即重新写入commitLog,消费者将重新消费;如果此时consumer和broker连接断开,consumer调用submitConsumeRequestLater()方法,在consumer处重新消费,16次之后,则必须通过工单、日志等方式进行人工干预使producer事务进行回退。如果此时consumer和broker网络断开,consumer会将该消息存入失败队列,并重新消费失败队列中的消息,并且继续调用submitConsumeRequestLater()。
消息堆积问题
生产者生产太快:在生产者部分使用限流降级的方法。
消费者消费太慢:
- 增加消费者实例、增加消息队列中每个主题的队列数量。
- 检查是否是消费者出现了大量的消费错误。
- 打印一下日志看是否哪个线程卡死,出现了锁资源不释放的问题。
刷盘策略
- 同步刷盘:消息追加到内存后立即刷到硬盘中存储
- 异步刷盘:消息追加到内存中,在后台异步刷到硬盘中存储。
如上图所示,同步刷盘小等待一个刷盘成功的ACK,同步刷盘对MQ消息可靠性是很好的保障,因此在性能上会有较大影响,一般适用于金融等特定业务场景。
异步刷盘则是开启一个线程异步执行刷盘操作,采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量,适用于发验证码等对于消息保证要求不高的业务场景。只有在broker宕机的时候会丢失部分数据。
RocketMQ默认采用异步刷盘策略。
同步复制和异步复制
这里指broker中的主从同步。
同步复制:只有消息同步双写到主从节点上时才返回写入成功。
异步复制:消息写入主节点后即返回写入成功。
异步复制时,当主节点挂掉,消费者会从从节点进行消费(producer不能再生产消息了),待主节点重启后,从节点会继续复制主从节点不同步的部分。
使用多主从设置(同一个topic可以分布在不同的broker中,设置多主从,一个broker的主节点挂掉,部署在其他broker的主节点依旧可用),可以保证producer的生产能够正常进行。但是这种方式又无法保证顺序(如之前的部分所述,为保证消息的顺序性,我们通常将一类语义信息发送到同一个队列,举个例子,主节点A负责订单A类,其他节点是无法代替主节点A,设置多主节点A1、A2、A3的话,订单A类的消费顺序就无法得到保证)。
RocketMQ中采用的Dledger解决了这个问题:在写入消息过程中,要求消息最少复制到半数以上节点(会牺牲一些效率),才给producer返回写入成功,并且它支持通过选举切换主节点。
存储机制
- CommitLog:消息主体以及元数据的存储主体,单个文件大小默认1G,文件名长度20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
- ConsumeQueue:消息消费队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的hashcode值,可以视为基于topic的commitlog索引文件,组织结构为:topic/queue/file,存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。consumeLog采用定长设置,每个条目20字节,8字节的commitLog物理偏移量、4字节的消息长度、8字节tag的hashcode,单个文件由30w个条目组成,像数组一样可以随机访问,每个consumeQueue大小约5.72M。
- IndexFile:提供了一种可以通过key和时间区间来查询消息的方法。
RocketMQ采用了混合型存储结构,broker单个实例下所有的队列共用一个日志数据文件来存储消息。
Kafka则为每个Topic分配一个存储文件。
两者一个把东西赛一个大房间,另一个把东西塞到不同功能的小房间。
显而易见,前者数据写入效率更高,但也会在读取时遍历整个房间。
然而RocketMQ引入了consumerQueue作为每个队列的索引文件夹,来提升消息的读取效率,可以根据句队列的消息序号,计算出索引的全局位置(索引序号*索引固定长度20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。