本文系记录对Java中消息队列的学习资料,如有异议,欢迎联系我讨论修改。PS:图侵删!如果看不清图请访问:传送门
消息队列基本概念
消息模型
- 点对点:消息生产者向消息队列中发送了一个消息后,只能被一个消费者消费一次
- 发布/订阅:消息生产者向频道发送一个消息后,多个消费者可以从该频道订阅到这条消息并消费
发布订阅模型与观察者模式的区别
- 观察者模式中,观察者和主题都知道对方的存在;而在发布与订阅模式中,生产者与消费者不知道对方的存在,它们之间通过频道进行通信
- 观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法返回;而发布与订阅模式是异步的,生产者向频道发送一个消息之后,就不需要关心消费者何时去订阅这个消息,可以立即返回
消息队列
消息队列是一个存放消息的容器,类似于数据结构中的队列,具有先进先出,双端操作的特性。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。
异步处理为何能提高系统性能
在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。
但是,用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。
消息队列如何降低系统间的耦合性
如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性更好。在大型网站中通常用利用消息队列实现事件驱动结构。消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
1.6 简述消息队列会带来的潜在问题
- 系统可用性降低:需要着力设计消息丢失或MQ挂掉的处理策略
- 系统复杂度提高:需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
- 一致性问题:异步在提高响应速度时,由于消息不能保证绝对正确消费,可能会带来一致性问题
消息队列的组件
- Broker:消息服务器,作为server提供消息核心服务
- Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
- Partition:对于一个主题,通常进行分块,然后冗余地存储在多个节点上保证可用性
- Offset:偏移量,通常使用Topic-Partition-Offset来绝对定位一条消息
- Producer:消息生产者,业务的发起方,负责生产消息传输给broker
- Consumer:消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
- Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
- Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
消息队列的使用场景
- 异步通信:异步处理机制,允许用户把消息放入队列,但并不立即处理它
- 解耦:降低工程间的强依赖程度,针对异构系统进行适配
- 冗余:把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险
- 扩展性:消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
- 过载保护:消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
- 可恢复性:消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
- 顺序保证:大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理
- 缓冲:消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度
消息队列的常用协议
- AMQP:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制
- MQTT:该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议
- STOMP:STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互
消息队列选型的标准
- 功能需求
- 性能需求
- 可用性需求
- 易用性需求
- 横向对比
常见消息队列选型参数对比
数据可靠性:
RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失。Kafka同步Replication理论上性能低于RocketMQ的同步Replication,原因是Kafka的数据以分区为单位组织,意味着一个Kafka实例上会有几百个数据分区,RocketMQ一个实例上只有一个数据分区,RocketMQ可以充分利用IO组Commit机制。
性能对比:
Kafka与RocketMQ均为10w/s以上。
消息投递实时性:
Kafka使用短轮询方式,实时性取决于轮询间隔时间,0.8以后版本支持长轮询;RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。
失败重试:
Kafka消费失败不支持重试;RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。
严格的消息顺序:
Kafka支持消息顺序,但是一台代理宕机后,就会产生消息乱序;RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,发送消息会失败,但是不会乱序。
定时消息:
Kafka不支持定时消息;RocketMQ支持两类定时消息。
场景应用
如何解决消息队列重复消费以及保证消息消费的幂等性
消息队列在使用中,可能会出现消息重复消费的情况:
- 例如,在Kafka中,使用offset标记消息的序号,当消费者消费数据之后每隔一段时间(定时),会提交已消费消息的offset表明已消费这些消息,如果消费者重启,那么下次从offset编号的消息继续消费。但是如果消费者重启时,本次已处理的消息的offset还没有提交,那么下次必然会从上次提交的offset出开始消费,造成重复消费。
- 例如,在rabbitMQ中,生产者已把消息发送给mq,而mq在给生产者返回ack时生产者断网,未接收到确认消息,生产者判定消息发送失败,下次网络重连时,生产者重发消息;消费者在消费mq中消息后向mq返回ack时断网,导致mq未收到确认消息,该条消息会重新发送给其他消费者或断网重连后发送给其他消费者,造成重复消费。
消息重复处理,可能会带来不必要的性能开销,但是如果能保证消息处理的幂等性,适量的重复消费也是可以接受的。实现消息消费幂等性的手段如下:
- 对于数据库插入,先根据主键查询,如果有数据就更新
- 对于Redis写入,Redis使用set模式,具有天然的幂等性
- 消息队列对每条消息设置全局唯一且与业务无关的id,消费者消费时,先根据id查询(Redis)是否已消费,如果没消费,设置这个消息id为已消费。核心原理是做查询
- 对于数据库可以基于唯一键约束,重复插入会报错,导致数据库中不会出现脏数据
如何保证消息队列的高可用
(集群架构怎么设计保证高容错性) 消息队列有具有三种可用性模式:
- 单机模式,易于部署,但是可用性低,一旦宕机,就无法提供服务
- 普通集群模式,无高可用性,在多台机器上启动多个RabbitMQ实例。用户创建的Queue只会存放在一个RabbitMQ实例上,每个实例都同步Queue的元数据。一旦用户连接的不是主实例,这个实例会去主实例拉取数据,然后提供服务。这种模式缺点如下:MQ集群内部可能会产生大量数据传输;无法提供高可用性,主实例节点宕机,无法提供数据;并没有做到分布式,容易产生单点压力过大
- 镜像集群模式,每个实例保留其他实例的完整镜像,写入Queue后自动同步到其他实例的Queue上。虽然能保证节点宕机后,其他节点具有完整数据,服务不至于中断。但是这种模式会使得集群内部网络带宽消耗严重,扩展性不高;同步所有节点的数据容易超出机器的容量
Kafka的高可用设计方案
Kafka由多个broker-节点组成,每个broker都是一个节点;创建一个topic,topic是一个逻辑概念,代表了一类消息,可以被认为是消息被发送到的地方。topic通常被多个消费者订阅,每个topic由多个partition-分区组成,partition具有自己专属的partition号,通常是从0开始的。用户从partition尾部追加写入消息。partition中的每条消息都会被分配一个唯一的序列号,被称为offset-位移,offset是从0开始顺序递增的整数。一条消息的定位由一个三元组topic-partition-offset组成。
Kafka为了实现高可用,采用了冗余机制,通过replica-副本备份,防止数据丢失。其中,副本分为两类,Leader副本和Follower副本。Follower副本不提供给客户端,即不响应客户端发送来的消息写入与消费请求,它仅仅被动地向Leader副本获取数据。Leader副本则负责提供服务。一旦Leader副本所在的broker宕机,Kafka会从剩余的replica中选出新的leader继续提供服务。Kafka保证同一个partition的多个replica一定不会分配在同一个broker中,副本因子则决定了partition的备份数量。
此外,Kafka还有一个ISR的概念,即与Kafka维护的leader replica保持同步的replica集合,只有该集合中的副本才有机会竞选为leader。而生产者写入的一条消息只有被ISR中所有副本都接收到时,消息才被视为已提交状态。Kafka对于没有提交成功的消息不做任何交付成功保证,它只保证在ISR存活的情况下,以已提交的消息不会丢失。若ISR中有N个副本,那么该分区最多可以容忍N-1个副本崩溃而不丢失已提交消息。
如何保证消息的可靠性传输
RabbitMQ的解决方案:
- 生产者丢失消息:生产者在发送消息时由于网络问题可能会导致消息丢失。解决办法如下,开启RabbitMQ事务功能,事务提供回滚、重试、提交等功能,但是会影响吞吐量,且事务是同步的,无法异步执行;使用Confirm机制,每次写消息时,都会分配一个唯一的id,当向MQ中写消息时处理成功MQ会回传一个ack消息,如果MQ未能处理则回调一个nack接口,随后进行重试。事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了
- MQ丢失消息:掉电可能会引发数据丢失,解决办法是开启持久化。持久化不保证数据绝对不丢失,可能会以极小的概率导致少量数据丢失。持久化可以和生产者的confirm配合,消息持久化到磁盘后才通知生产者ack
- 消费者丢失消息:消费者在消费中,突然宕机或进程挂了,可能导致数据丢失。解决办法是利用ACK机制,而且关闭自动ACK,通过API调用,当逻辑处理完后再手动ACK
Kafka的解决方案:
- 消费者丢失消息:消费者已经提交offset,然而该offset对应的消息尚未真正被消费完,导致消息丢失。同理,关闭自动提交offset,手动提交offset,这种情况虽然会产生消息重复消费,但只要保证消费幂等性,少量的重复消费也是可以接受
- Kafka丢失数据:某个broker宕机,导致重新竞选副本leader,然而竞选时可能会出现某些follower还没有完成数据同步。解决方法如下:首先设置系统的副本因子必须大于1;设置min.insync.replicas值必须大于1,要求一个leader至少感知到有至少一个follower还跟自己保持联系;在producer端设置acks=all,要求每个消息写入所有副本后才认为消息提交成功;在producer端设置retries=MAX,一旦写入失败就无限重试
如何保证消息的顺序性
首先先描述下一些消费顺序错乱的场景:
- RabbitMQ中,一个Queue,多个Consumer的情况
- Kafka中,一个Topic,一个Partition,一个Consumer但是内部具有多线程结构
解决方案:
- RabbitMQ拆分队列,每个Queue对应一个Consumer
- Kafka:消费者中使用内存队列解决,将相同Hash过的数据放在一个内存队列里,采用单线程消费,写n个内存的Queue,每个线程分别消费一个Queue
如何解决消息队列的延时以及过期失效问题
RabbtiMQ可以设置过期时间的,也就是TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。此时不存在消息积压,而是消息丢失。解决办法是采用批量重导技术,被丢弃的数据在后半夜平峰时,重新灌入MQ处理。
消息队列满了以后该怎么处理
队列满了,可能是消费者出现了问题导致消费速度太慢。从多角度查询问题根源,可能是队列太长导致磁盘写满、下游MySQL宕机导致消费者Hang住。分情况解决问题。
如果有几百万消息持续积压几小时怎么解决
如果让消费者以默认速度消费,肯定会花费大量时间,不太容易接受这种解决办法。为了提高消费速度,可以采用紧急扩容,步骤如下:
- 先修复消费者的故障,恢复其处理速度,修复后先不要返工
- 新建Topic,令partition是原来的10倍,临时建立好原先10-20倍的队列数量
- 设置一个临时的分发数据的消费者程序,该程序用于消费积压的数据,直接将积压的数据均匀写入10倍扩容后的队列中
- 利用10倍的机器部署消费者,分别消费10倍的队列
- 快速消费积压数据后恢复原先的架构继续生产消费
假设1万个订单积压在mq里面,没有处理,导致1000个订单丢失,如何处理
采用批量重导,手动查出1000个订单,在后半夜平峰时期手动插入MQ重新处理
如何设计一个消息队列
https://zhuanlan.zhihu.com/p/21649950
- 具有可伸缩性
- 保证高可用性
- 保证高稳定性
- 持久化功能
Kafka
Kafka的设计精要
- 吞吐量/延时
- 消息持久化
- 负载均衡和故障转移
- 伸缩性
Kafka为什么具有高吞吐低延时
Kafka的写入快,每次都将数据写入os的页缓存上,由os决定何时写回磁盘,这就有三个优势:
- os的页缓存(避免穿透到底层物理磁盘上获取数据)是在内存中分配的,所以消息写入速度非常快
- 繁琐的底层I/O由os处理
- 写入操作采用追加写入的方式,避免了磁盘随机写操作
- 利用了Linux上的sendfile系统调用,也即零拷贝技术
Kafka中的消息持久化
Kafka具有持久化功能,将信息持久化到磁盘上,好处如下:
- 解耦消息发送与消息消费:Kafka提供了生产者-消费者的完整解决方案,消息生产完交给Kafka服务器保存即可,提高了整体的吞吐量
- 实现灵活的消息处理:支持对已处理过的消息在未来的某个时间点再次处理,即消息重演
- 持久化的刷盘策略不同,Kafka所有数据都会立即被写入文件系统的持久化日志中,避免过高的内存消耗,将空间用于页缓存,进一步提高整体性能
Kafka的伸缩性
- 伸缩性表示向分布式系统中增加额外的计算资源时吞吐量提升的能力。如果服务器是无状态的或将状态交给专门的协调服务来做,例如ZooKeeper,那么整个集群的服务器间无需繁重的状态共享,可以极大降低维护复杂度。扩容时,简单地启动新机器,向ZooKeeper注册即可。
RabbitMQ
RabbitMQ的简介
RabbitMQ是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 的具体特点可以概括为以下几点:
- 可靠性: RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
- 灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们将 RabbitMQ 核心概念的时候详细介绍到。
- 扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
- 多语言客户端: RabbitMQ几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript等。
- 易用的管理界面: RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
- 插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI机制。
RabbitMQ交换器
在RabbitMQ中,消息并不是被直接投递搭到Queue中,先经过Exchanger-交换器,然后把消息分配到对应的Queue中。Exchanger用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到则会返回给生产者或直接丢弃。Exchanger有四种类型,对应不同的路由策略:
- fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以fanout类型是所有的交换机类型里面速度最快的。fanout类型常用来广播消息
- direct类型的Exchange路由规则也很简单,它会把消息路由到那些Bindingkey与RoutingKey完全匹配的Queue中。direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列
- topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到BindingKey和 RoutingKey相匹配的队列中,匹配规则类似正则表达
- headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配
综合对比
https://blog.csdn.net/paincupid/article/details/79721817