初识 MQ(消息队列)

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

20250810090142

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

消息队列的作用

  1. 解耦:当有多个线程需访问某个相同线程的数据时,该线程通过对应的方法来与请求调用的线程通信。为了降低这种强耦合,就可以使用 MQ,我们只需要把数据发送到 MQ,其他系统如果需要数据,则从 MQ 中获取即可。
    20250810090518

应用:
20250810091452
当用户发出一个支付请求时,支付服务只需将事件发送给 Broker 代理,Broker 同时通知所有的服务,当有某个服务需要暂停服务时,我们只需要取消订阅事件就可,解决了强耦合的问题。 2. 异步:一个客户端请求发送进来,系统 A 会调用系统 B、C、D 三个系统,同步请求的话,响应时间就是系统 A、B、C、D 的总和,也就是 800ms。如果使用 MQ,系统 A 发送数据到 MQ,然后就可以返回响应给客户端,不需要再等待系统 B、C、D 的响应,可以大大地提高性能。
20250810090927 3. 削峰:假设系统 A 在某一段时间请求数暴增,有 5000 个请求发送过来,系统 A 这时就会发送 5000 条 SQL 进入 MySQL 进行执行,MySQL 对于如此庞大的请求当然处理不过来,MySQL 就会崩溃,导致系统瘫痪。如果使用 MQ,系统 A 不再是直接发送 SQL 到数据库,而是把数据发送到 MQ,MQ 短时间积压数据是可以接受的,然后由消费者每次拉取 2000 条进行处理,防止在请求峰值时期大量的请求直接发送到 MySQL 导致系统崩溃。
20250810091215

优点:

  • 耦合度低
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

缺点:

  • 依赖于 Broker 的可靠性、安全性、吞吐能力
  • 结构复制,不好追踪

20250810093430

RabbitMQ 学习记录

RabbitMQ 是一款使用 Erlang 语言开发的,实现 AMQP(高级消息队列协议)的开源消息中间件。
特点:

  • 可靠性。支持持久化,传输确认,发布确认等保证了 MQ 的可靠性。
  • 灵活的分发消息策略。这应该是 RabbitMQ 的一大特点。在消息进入 MQ 前由 Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
  • 多种协议。RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

安装

  1. docker 镜像安装
  • docker pull rabbitqp:3-management
  • ```shell
    docker run \
    -e RABBIIMQ_DEFAULT_USER_itcast \
    -e RABBIIMQ_DEFAULT_PASS_123321 \
    —name mq \
    —hostname mq1 \
    -p 15672:15672 \
    -p 5673:5672 \
    -d \
    rabbitmq:3-management
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50

    2. 下载压缩包安装

    - 安装 erLang 语言
    [Erlang 官网](http://www.erlang.org/downloads?spm=a2c6h.12873639.article-detail.5.433733dfT7UlIW),下载后双击安装文件,更改安装目录,其它默认即可
    - 配置环境变量,增加`ERLANG_HOME`变量名,变量值为`erlang的安装目录`。
    - 配置 Path,增加`%ERLANG_HOME%\bin`
    - 验证安装,打开 powershell,输入`erl -version`
    - 安装 RabbitMQ 客户端
    在 RabbitMQ 的[github 项目](https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3?spm=a2c6h.12873639.article-detail.6.433733dfT7UlIW&file=v3.7.3)中,下载 windows 版本的服务端安装包
    - 安装完成后,找到安装目录
    ![20250810095346](https://lypicgo.myskill-blog.cn//images/20250810095346.png)
    - 在此目录下打开 cmd 命令,输入 rabbitmq-plugins enable rabbitmq_management 命令安装管理页面的插件
    - 双击双击 rabbitmq-server.bat 启动脚本,然后打开服务管理可以看到 RabbitMQ 正在运行,打开浏览器输入 http://localhost:15672,账号密码默认是:guest/guest

    ![20250810103004](https://lypicgo.myskill-blog.cn//images/20250810103004.png)

    ## 组成

    - Broker:消息队列服务进程。此进程包括两个部分:Exchange 和 Queue。
    - Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
    - Queue:消息队列,存储消息的队列。
    - Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
    - Consumer:消息消费者。消费队列中存储的消息。
    ![20250810104359](https://lypicgo.myskill-blog.cn//images/20250810104359.png)

    **流程**

    - 消息生产者连接到 RabbitMQ Broker,创建 connection,开启 channel。
    - 生产者声明交换机类型、名称、是否持久化等。
    - 生产者发送消息,并指定消息是否持久化等属性和 routing key。
    - exchange 收到消息之后,根据 routing key 路由到跟当前交换机绑定的相匹配的队列里面。
    - 消费者监听接收到消息之后开始业务处理。

    ## AMQP

    ## Spring AMQP 概述

    Spring AMQP 是一个基于 AMQP 协议的消息中间件框架,它提供了一个简单的 API 来发送和接收异步、可靠的消息。Spring AMQP 还提供了一些高级特性,如消息转换器、消息路由、消息过滤和消息拦截等。

    Spring AMQP 由两个模块组成(每个模块在发行版中用一个 JAR 表示):spring-amqp 和 spring-rabbit。

    ### 在父工程添加 Spring-amqp 的依赖

    ```xml
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>3.0.4-SNAPSHOT</version>
    </dependency>

基本消息队列

20250810105118

工作流程

  1. 连接 Rabbit 服务器
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1 # ip地址
port: 5672 # 端口号
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机
  1. 在 publisher 服务中利用 RabbitTemplate 发送消息到队列中
1
2
3
4
5
6
7
8
9
@Autowired
private RabbitTemplate rabbitTemplate;

public void testSendMessage(){
String queueName = "simple.queue";
String message = "hello,spring amqp"
rabbitTemplate.convertAndSend(queueName,message);
}

  1. 在 consumer 服务中编写消息逻辑,绑定所对应的队列
1
2
3
4
5
6
@Component
//使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer(String msg) throws InterruptedException{
System.out.println("spring recevice message: " + msg);
}

工作队列

一个队列绑定多个消费者。
消息预取:当我们有大量的消息进入队列时,我们的队列会将消息进行投递,而我们的消费者会提前将消息拿过来消费,不考虑有没有能力消费
消息预取限制:preFetch,这个属性可以控制预取消息的上限

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成之后才能获取下一个消息

20250810152208

发布订阅

允许将同一消息发送给多个消费者。实现方式是加入了交换机

交换机作用:接受生产者的消息,并转发到队列中,不可以缓存消息,路由失败,消息丢失

20250810155245

按照交换机类型又分为 3 种
20250810155750

Fanout Exchange:广播

会将接受到的消息路由到每一个跟其绑定的 queue
20250810155543

  1. 在消费者中声明一个配置类交换机、队列、绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("交换机名字")
}
// 声明第一个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("队列名字")
}
// 绑定队列和交换机
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

// 以相同的方式声明第n个队列,并完成绑定
}
  1. 将消息发送到交换机
1
2
3
4
5
6
7
8
@Autowired
private RabbitTemplate rabbitTemplate;

public void testSendMessage(){
String queueName = "simple.queue";
String message = "hello,spring amqp"
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
  1. 消费者监听消息队列

Direct Exchange:路由

会将消息根据规则路由到指定的 queue

  • 每个 queue 都与 Exchange 设置一个 BindingKey
  • 发布者发送消息时,指定消息的 RoutingKey
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的对队列
    20250810163159
  1. 在消费者中利用@RabbitListener 声明 Exchange、queue、RoutingKey
1
2
3
4
5
6
7
8
9
@RabbitListener(bindings = @queueBinding(
value = @Queue(name = "队列名字"),
exchange = @Exchange(name = "交换机名字",type = ExchangeTypes.DIRECT),
key = {"key1","key2",...}
))
public void listenDirectQueue1(String msg){
System.out.println(msg);
}

  1. 在发送时指定 key,交换机,和消息
1
2
3
4
5
6
7
8
@Autowired
private RabbitTemplate rabbitTemplate;

public void testSendMessage(){
String queueName = "simple.queue";
String message = "hello,spring amqp"
rabbitTemplate.convertAndSend(exchangeName,"routingkey",message);
}

Topic Exchange:主题

与 direct Exchange 类似,区别在与 routingkey 必须是多个单词的列表,并且以.分割。Queue 与 Exchange 指定 BindingKey 时可以使用通配符:

  • :代表 0 个或多个单词

  • *:代指一个单词
    20250810165658
  1. 在消费者中声明交换机、队列
1
2
3
4
5
6
7
8
9
@RabbitListener(bindings = @queueBinding(
value = @Queue(name = "队列名字"),
exchange = @Exchange(name = "交换机名字",type = ExchangeTypes.TOPIC),
key = "#.key"
))
public void listenTopicQueue1(String msg){
System.out.println(msg);
}

消息转换器

Spring 的消息对象的处理默认实现的是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。它的转化占用的字节太长,不仅速度慢,占用空间大,而且不安全。
如果要修改只需要定义一个 MessageConverter 类型的 Bean 即可。

  1. 引入依赖
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jsckson.dataformat</groupId>
<artifactId>jsckson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
  1. 在生产者服务声明 MessageConverter:
1
2
3
4
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

RabbitMQ 进阶

MQ 在执行时存在可靠性问题,如果生产者给 Broder 发送消息时出现故障,或者 Broder 出现故障消费者就收不到消息,
20250810190011

发送者可靠性问题

由于网络波动,可能会出现客户端连接 MQ 失败的情况。

生产者重试

解决:配置开启重连机制

1
2
3
4
5
6
7
8
9
spring:
rabbitqp:
connectio-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重连机制
initial-interval: 1000ms # 失败后的初始等待时间
multipier: 1 # 失败后下次的等待时长的倍数,下次等待时长=initial-interval*multiplier
max-attempts: 3 # 最大重连次数

连接重试机制是阻塞式重试,当连接失败时,该线程会进入等待时间

生产者确认

在 MQ 成功接收到消息后会返回确认消息给生产者。返回的结果情况:

  • 消息投递到了 MQ,但是路由失败。此时会通过 publisherReturn 返回路由异常原因,然后返回 ACK,告知投递成功
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
  • 持久消息传递到了 MQ,并且入队完成持久化,返回 ACK,告知投递成功
  • 其它情况返回 NACK,告知投递失败

配置

1
2
3
4
spring:
rabbitqp:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型(none 关闭 simple 同步阻塞等待 confirm 异步回调)
publisher-returns: true # 开启publisher return 机制

编写回调函数
每一个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目启动过程中配置

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(ReturnedMessage returned){
log.ingo("消息发送失败,应答码{},原因,交换机{},路由键{},消息{}",replyCode,replyText,exchange,routingKey,message.toString());
};
}
}

发送消息,指定消息 id、消息 ConfirmCallback
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void testPublisherConfirm() throws InterruptException{
// 创建CorrelationData
CorrelationData cd = new CorrelationData();
// 给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){

@Override
public void onFailure(Throwable ex){
// Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail",ex)
}
@Override
public void onSuccess(CorrelationData.Confirm result){
// Future接收到回执的执行处理逻辑,参数中的result就是回执内容
if(result.isAck()){
// result.isAck()判断回执是Ack还是nack
log.debug("发送消息成功,收到ack")
}else{
log.error("发送消息失败,收到nack,reason:{}",result.getReason());
}
}

});
// 发送消息
rabbitTemplate.convertAndSend("交换机名字""routingkey","message",cd);
}

MQ 的可靠性问题

在默认情况下,RabbitMQ 会将接收到的信息保存到内存当中以降低消息收发的延迟,这样会存在一些问题

  • 一但 MQ 宕机,内存中的信息就会消失
  • 内存空间有限,当消费者故障或处理过慢时,新来的消息会因为没有空间导致 MQ 阻塞,在 MQ 内部会将老的信息取出来,进而为新的消息腾出空间,在这个过程中 MQ 是处于阻塞不工作状态

解决方案:

  1. 数据持久化(3 个方面)
  • 交换机持久化:默认配置为持久化
  • 队列持久化:默认配置为持久化
  • 消息持久化:默认为持久化
  1. Lazy Queue(惰性队列)
    惰性队列的特征如下:
  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
    配置方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启lazy模式
.build();
}

方式2
```java
@RabbitListener(queuesToDeclare=Queue(
name="lazy.queue",
durable="true",
arguments=@Argument(name="x-queue-mode",value="lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息");
}

消费者的可靠性

  1. 消费者确认机制:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息的处理状态。
    回执有 3 种可选值:
  • ack:成功处理消息
  • nack:消息处理失败
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从列表中删除该消息,Spring 默认帮我们实现了这个机制,我们只需要通过配置文件来选择 ACK 的处理方式即可,none:不处理,manual:手动处理,auto,自动处理

配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge: auto
  1. 消息处理失败策略
    当消费者出现异常后,消息会不断重新到队列,在重新发送给消费者,然后再次尝试,无限循环,导致 mq 的消息处理飙升,带来不必要的压力
    解决方案:
    利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限次的返回到队列,在开始重试模式后,如果重试次数耗尽,如果消息依然失败,则需要有 MessageRecover 接口来处理:
  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
    20250810221111

引入 MessageRecoverer:

1
2
3
4
5
6
7
// 加上类的满足当消费者开启失败重启的情况下才生效的前提
@ConditionalOnProperty(prefix="配置的前缀",name="enable",havingValue="true")

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"交换机名字""key")
}
1
2
3
4
5
6
7
8
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
max-attempts: 3 # 最大重试次数
atateless: true # true无状态,false有状态,如果有事务,这里改为false
  1. 业务幂等性