RabbitMQ学习记录
初识 MQ(消息队列)
消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
消息队列的作用
- 解耦:当有多个线程需访问某个相同线程的数据时,该线程通过对应的方法来与请求调用的线程通信。为了降低这种强耦合,就可以使用 MQ,我们只需要把数据发送到 MQ,其他系统如果需要数据,则从 MQ 中获取即可。
应用:
当用户发出一个支付请求时,支付服务只需将事件发送给 Broker 代理,Broker 同时通知所有的服务,当有某个服务需要暂停服务时,我们只需要取消订阅事件就可,解决了强耦合的问题。 2. 异步:一个客户端请求发送进来,系统 A 会调用系统 B、C、D 三个系统,同步请求的话,响应时间就是系统 A、B、C、D 的总和,也就是 800ms。如果使用 MQ,系统 A 发送数据到 MQ,然后就可以返回响应给客户端,不需要再等待系统 B、C、D 的响应,可以大大地提高性能。
优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
缺点:
- 依赖于 Broker 的可靠性、安全性、吞吐能力
- 结构复制,不好追踪
RabbitMQ 学习记录
RabbitMQ 是一款使用 Erlang 语言开发的,实现 AMQP(高级消息队列协议)的开源消息中间件。
特点:
- 可靠性。支持持久化,传输确认,发布确认等保证了 MQ 的可靠性。
- 灵活的分发消息策略。这应该是 RabbitMQ 的一大特点。在消息进入 MQ 前由 Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
- 支持集群。多台 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
- 多种协议。RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
安装
- docker 镜像安装
- docker pull rabbitqp:3-management
- ```shell
docker run \
-e RABBIIMQ_DEFAULT_USER_itcast \
-e RABBIIMQ_DEFAULT_PASS_123321 \
—name mq \
—hostname mq1 \
-p 15672:15672 \
-p 5673:5672 \
-d \
rabbitmq:3-management1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2. 下载压缩包安装
- 安装 erLang 语言
[Erlang 官网](http://www.erlang.org/downloads?spm=a2c6h.12873639.article-detail.5.433733dfT7UlIW),下载后双击安装文件,更改安装目录,其它默认即可
- 配置环境变量,增加`ERLANG_HOME`变量名,变量值为`erlang的安装目录`。
- 配置 Path,增加`%ERLANG_HOME%\bin`
- 验证安装,打开 powershell,输入`erl -version`
- 安装 RabbitMQ 客户端
在 RabbitMQ 的[github 项目](https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3?spm=a2c6h.12873639.article-detail.6.433733dfT7UlIW&file=v3.7.3)中,下载 windows 版本的服务端安装包
- 安装完成后,找到安装目录

- 在此目录下打开 cmd 命令,输入 rabbitmq-plugins enable rabbitmq_management 命令安装管理页面的插件
- 双击双击 rabbitmq-server.bat 启动脚本,然后打开服务管理可以看到 RabbitMQ 正在运行,打开浏览器输入 http://localhost:15672,账号密码默认是:guest/guest

## 组成
- Broker:消息队列服务进程。此进程包括两个部分:Exchange 和 Queue。
- Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
- Queue:消息队列,存储消息的队列。
- Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
- Consumer:消息消费者。消费队列中存储的消息。

**流程**
- 消息生产者连接到 RabbitMQ Broker,创建 connection,开启 channel。
- 生产者声明交换机类型、名称、是否持久化等。
- 生产者发送消息,并指定消息是否持久化等属性和 routing key。
- exchange 收到消息之后,根据 routing key 路由到跟当前交换机绑定的相匹配的队列里面。
- 消费者监听接收到消息之后开始业务处理。
## AMQP
## Spring AMQP 概述
Spring AMQP 是一个基于 AMQP 协议的消息中间件框架,它提供了一个简单的 API 来发送和接收异步、可靠的消息。Spring AMQP 还提供了一些高级特性,如消息转换器、消息路由、消息过滤和消息拦截等。
Spring AMQP 由两个模块组成(每个模块在发行版中用一个 JAR 表示):spring-amqp 和 spring-rabbit。
### 在父工程添加 Spring-amqp 的依赖
```xml
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.0.4-SNAPSHOT</version>
</dependency>
基本消息队列
工作流程
- 连接 Rabbit 服务器
1 | spring: |
- 在 publisher 服务中利用 RabbitTemplate 发送消息到队列中
1 |
|
- 在 consumer 服务中编写消息逻辑,绑定所对应的队列
1 |
|
工作队列
一个队列绑定多个消费者。
消息预取:当我们有大量的消息进入队列时,我们的队列会将消息进行投递,而我们的消费者会提前将消息拿过来消费,不考虑有没有能力消费
消息预取限制:preFetch,这个属性可以控制预取消息的上限
1 | spring: |
发布订阅
允许将同一消息发送给多个消费者。实现方式是加入了交换机
交换机作用:接受生产者的消息,并转发到队列中,不可以缓存消息,路由失败,消息丢失
按照交换机类型又分为 3 种
Fanout Exchange:广播
会将接受到的消息路由到每一个跟其绑定的 queue
- 在消费者中声明一个配置类交换机、队列、绑定
1 |
|
- 将消息发送到交换机
1 |
|
- 消费者监听消息队列
Direct Exchange:路由
会将消息根据规则路由到指定的 queue
- 每个 queue 都与 Exchange 设置一个 BindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的对队列
- 在消费者中利用@RabbitListener 声明 Exchange、queue、RoutingKey
1 |
|
- 在发送时指定 key,交换机,和消息
1 |
|
Topic Exchange:主题
与 direct Exchange 类似,区别在与 routingkey 必须是多个单词的列表,并且以.分割。Queue 与 Exchange 指定 BindingKey 时可以使用通配符:
:代表 0 个或多个单词
- *:代指一个单词
- 在消费者中声明交换机、队列
1 |
|
消息转换器
Spring 的消息对象的处理默认实现的是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。它的转化占用的字节太长,不仅速度慢,占用空间大,而且不安全。
如果要修改只需要定义一个 MessageConverter 类型的 Bean 即可。
- 引入依赖
1 | <dependency> |
- 在生产者服务声明 MessageConverter:
1 |
|
RabbitMQ 进阶
MQ 在执行时存在可靠性问题,如果生产者给 Broder 发送消息时出现故障,或者 Broder 出现故障消费者就收不到消息,
发送者可靠性问题
由于网络波动,可能会出现客户端连接 MQ 失败的情况。
生产者重试
解决:配置开启重连机制
1 | spring: |
连接重试机制是阻塞式重试,当连接失败时,该线程会进入等待时间
生产者确认
在 MQ 成功接收到消息后会返回确认消息给生产者。返回的结果情况:
- 消息投递到了 MQ,但是路由失败。此时会通过 publisherReturn 返回路由异常原因,然后返回 ACK,告知投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
- 持久消息传递到了 MQ,并且入队完成持久化,返回 ACK,告知投递成功
- 其它情况返回 NACK,告知投递失败
配置
1 | spring: |
编写回调函数
每一个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目启动过程中配置
1 |
|
发送消息,指定消息 id、消息 ConfirmCallback
示例:
1 | void testPublisherConfirm() throws InterruptException{ |
MQ 的可靠性问题
在默认情况下,RabbitMQ 会将接收到的信息保存到内存当中以降低消息收发的延迟,这样会存在一些问题
- 一但 MQ 宕机,内存中的信息就会消失
- 内存空间有限,当消费者故障或处理过慢时,新来的消息会因为没有空间导致 MQ 阻塞,在 MQ 内部会将老的信息取出来,进而为新的消息腾出空间,在这个过程中 MQ 是处于阻塞不工作状态
解决方案:
- 数据持久化(3 个方面)
- 交换机持久化:默认配置为持久化
- 队列持久化:默认配置为持久化
- 消息持久化:默认为持久化
- Lazy Queue(惰性队列)
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
配置方式:
1 |
|
消费者的可靠性
- 消费者确认机制:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息的处理状态。
回执有 3 种可选值:
- ack:成功处理消息
- nack:消息处理失败
- reject:消息处理失败并拒绝该消息,RabbitMQ 从列表中删除该消息,Spring 默认帮我们实现了这个机制,我们只需要通过配置文件来选择 ACK 的处理方式即可,none:不处理,manual:手动处理,auto,自动处理
配置:
1 | spring: |
- 消息处理失败策略
当消费者出现异常后,消息会不断重新到队列,在重新发送给消费者,然后再次尝试,无限循环,导致 mq 的消息处理飙升,带来不必要的压力
解决方案:
利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限次的返回到队列,在开始重试模式后,如果重试次数耗尽,如果消息依然失败,则需要有 MessageRecover 接口来处理:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
引入 MessageRecoverer:
1 | // 加上类的满足当消费者开启失败重启的情况下才生效的前提 |
1 | listener: |
- 业务幂等性













