在现代 Java 开发中,异步非阻塞编程已成为处理高并发场景的核心能力。Reactor 作为响应式编程的主流实现,不仅是 Spring WebFlux 的底层依赖,更能帮助开发者构建高效、可扩展的数据流处理系统。本文将从基础到实战,全面解析 Reactor 的核心概念与使用方法。

一、什么是 Reactor?

Reactor 是一个基于响应式编程范式的 Java 库,严格遵循 Reactive Streams 规范,专注于解决异步非阻塞场景下的数据流处理问题。其核心优势在于:

  • 高效利用系统资源(通过非阻塞 I/O)
  • 天然支持背压(Backpressure)机制
  • 简化异步代码逻辑(避免”回调地狱”)
  • 与 Spring 生态深度集成(如 Spring WebFlux、Spring Cloud Gateway)

二、响应式编程基础

2.1 核心思想

响应式编程以”数据流”和”变化传播”为核心,主要特点包括:

  • 声明式编程:关注”做什么”而非”怎么做”
  • 异步非阻塞:I/O 操作不阻塞线程,提高资源利用率
  • 事件驱动:基于事件触发数据流处理
  • 背压支持:消费者可向上游反馈处理能力,防止数据溢出

2.2 与传统编程的对比

传统同步编程 响应式编程
线程阻塞等待 I/O 线程非阻塞,可处理其他任务
命令式代码(步骤化) 声明式代码(链式操作)
需手动管理线程池 内置调度器管理线程
无背压机制,易过载 背压自动平衡生产/消费速度

三、Reactor 核心组件

Reactor 的核心是两个数据流类型:FluxMono,它们是所有响应式操作的基础。

3.1 Flux:处理 0 到 N 个元素

Flux代表一个异步序列,可发射:

  • 0 个、1 个或多个元素
  • 一个完成信号(onComplete)
  • 一个错误信号(onError)

适用场景:集合处理、批量数据查询、消息队列消费等多元素场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;

public class FluxDemo {
public static void main(String[] args) {
// 1. 直接创建元素序列
Flux<String> flux1 = Flux.just("Java", "Python", "Go");

// 2. 从集合创建
List<Integer> numbers = Arrays.asList(1, 2, 3);
Flux<Integer> flux2 = Flux.fromIterable(numbers);

// 3. 生成范围序列
Flux<Integer> flux3 = Flux.range(1, 5); // 1,2,3,4,5

// 4. 订阅消费
flux3.subscribe(
value -> System.out.println("接收: " + value), // 处理元素
error -> System.err.println("错误: " + error), // 处理错误
() -> System.out.println("序列结束") // 处理完成
);
}
}

3.2 Mono:处理 0 或 1 个元素

Mono代表一个异步结果,可发射:

  • 0 个或 1 个元素
  • 一个完成信号(onComplete)
  • 一个错误信号(onError)

适用场景:单个结果查询、HTTP 请求返回、异步任务结果等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import reactor.core.publisher.Mono;

public class MonoDemo {
public static void main(String[] args) {
// 1. 单个元素
Mono<String> mono1 = Mono.just("Hello Reactor");

// 2. 空序列
Mono<Void> mono2 = Mono.empty();

// 3. 延迟计算(Supplier)
Mono<Long> mono3 = Mono.fromSupplier(() -> {
System.out.println("开始计算...");
return System.currentTimeMillis();
});

// 4. 订阅消费
mono3.subscribe(
result -> System.out.println("结果: " + result),
error -> System.err.println("错误: " + error),
() -> System.out.println("处理完成")
);
}
}

3.3 核心接口关系

Reactor 基于 Reactive Streams 规范,核心接口关系如下:

1
2
3
4
5
Publisher (Flux/Mono) → 可被订阅的数据流

Subscriber → 订阅者,接收数据流

Subscription → 连接Publisher与Subscriber,用于背压控制

四、常用操作符详解

操作符是 Reactor 处理数据流的核心工具,掌握常用操作符是学好 Reactor 的关键。

4.1 创建型操作符

用于生成数据流:

操作符 功能 示例
just() 直接发射元素 Flux.just(1,2,3)
fromIterable() 从集合创建 Flux.fromIterable(list)
range() 生成整数范围 Flux.range(1, 5)
create() 手动控制发射 Flux.create(sink -> { ... })

create()示例(整合回调 API):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Flux.create(sink -> {
// 模拟回调式API
new CallbackApi() {
@Override
public void onData(String data) {
sink.next(data); // 发射数据
}

@Override
public void onComplete() {
sink.complete(); // 结束序列
}

@Override
public void onError(Throwable e) {
sink.error(e); // 发射错误
}
};
});

4.2 转换型操作符

操作符 功能 注意事项
map() 同步转换元素(1:1) 适合轻量转换
flatMap() 转换为新流(1:N) 结果无序,适合异步转换
concatMap() 转换为新流并顺序拼接 结果有序,性能略低

flatMap()示例

1
2
3
4
// 将每个用户ID转换为用户详情查询
Flux.just(1L, 2L, 3L)
.flatMap(userId -> userService.findById(userId)) // 返回Mono<User>
.subscribe(user -> System.out.println(user.getName()));

4.3 过滤型操作符

1
2
3
4
5
6
Flux.range(1, 10)
.filter(i -> i % 2 == 0) // 保留偶数
.skip(2) // 跳过前2个
.take(2) // 取前2个
.distinct() // 去重
.subscribe(System.out::println); // 输出6,8

4.4 错误处理操作符

1
2
3
4
5
6
7
Flux.just(1, 2)
.concatWith(Flux.error(new RuntimeException("模拟错误")))
.onErrorResume(error -> { // 错误时切换到备用流
System.err.println("捕获错误: " + error.getMessage());
return Flux.just(3, 4);
})
.subscribe(System.out::println); // 1,2,3,4

常用错误处理操作符:

  • onErrorReturn():返回默认值
  • onErrorResume():切换到备用流
  • retry(3):重试 3 次

五、背压(Backpressure)机制

背压是 Reactor 的核心特性,用于解决”生产者速度远快于消费者”的问题。

5.1 工作原理

  1. 消费者通过Subscription向上游发送request(n)信号,声明可处理 n 个元素
  2. 生产者根据请求数量调整发射速度
  3. 若生产者无法减速,可通过背压策略处理超额数据

5.2 背压策略

1
2
3
4
Flux.interval(Duration.ofMillis(100)) // 快速生产者
.onBackpressureDrop(dropped -> System.out.println("丢弃: " + dropped)) // 丢弃策略
.delayElements(Duration.ofMillis(300)) // 慢速消费者
.subscribe(System.out::println);

常用策略:

  • onBackpressureBuffer():缓冲(默认,满则报错)
  • onBackpressureDrop():丢弃超额元素
  • onBackpressureLatest():保留最新元素
  • onBackpressureError():直接报错

六、线程调度(Schedulers)

Reactor 通过Schedulers管理线程,实现异步操作的线程切换。

6.1 常用调度器

调度器 特点 适用场景
immediate() 当前线程 同步操作
single() 单线程(可重用) 轻量异步任务
boundedElastic() 弹性线程池(有上限) I/O 密集型操作
parallel() 固定大小线程池(CPU 核心数) CPU 密集型操作

6.2 线程切换方法

  • subscribeOn():指定生产者执行的线程(整个流的源头)
  • publishOn():指定后续操作的执行线程(影响下游)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flux.range(1, 3)
.map(i -> {
System.out.println("map线程: " + Thread.currentThread().getName());
return i * 2;
})
.publishOn(Schedulers.boundedElastic()) // 切换下游线程
.flatMap(i -> {
System.out.println("flatMap线程: " + Thread.currentThread().getName());
return Mono.just(i + 1);
})
.subscribeOn(Schedulers.parallel()) // 源头线程
.subscribe(result -> {
System.out.println("订阅线程: " + Thread.currentThread().getName());
});

七、Spring 生态中的应用

7.1 Spring WebFlux 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
@RequestMapping("/users")
public class UserController {

private final UserService userService;

public UserController(UserService userService) {
this.userService = userService;
}

@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}

@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}

7.2 响应式数据访问(R2DBC)

1
2
3
4
5
6
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
// 响应式查询
Flux<User> findByAgeGreaterThan(int age);

Mono<User> findByUsername(String username);
}

八、测试 Reactor 代码

使用StepVerifier验证响应式流的行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import reactor.test.StepVerifier;

public class ReactorTest {
@Test
public void testFlux() {
Flux<Integer> flux = Flux.just(1, 2, 3).map(i -> i * 2);

StepVerifier.create(flux)
.expectNext(2) // 验证第一个元素
.expectNext(4) // 验证第二个元素
.expectNext(6) // 验证第三个元素
.verifyComplete(); // 验证正常完成
}
}