在现代 Java 开发中,异步非阻塞编程已成为处理高并发场景的核心能力。Reactor 作为响应式编程的主流实现,不仅是 Spring WebFlux 的底层依赖,更能帮助开发者构建高效、可扩展的数据流处理系统。本文将从基础到实战,全面解析 Reactor 的核心概念与使用方法。
一、什么是 Reactor?
Reactor 是一个基于响应式编程范式的 Java 库,严格遵循 Reactive Streams 规范,专注于解决异步非阻塞场景下的数据流处理问题。其核心优势在于:
- 高效利用系统资源(通过非阻塞 I/O)
- 天然支持背压(Backpressure)机制
- 简化异步代码逻辑(避免”回调地狱”)
- 与 Spring 生态深度集成(如 Spring WebFlux、Spring Cloud Gateway)
二、响应式编程基础
2.1 核心思想
响应式编程以”数据流”和”变化传播”为核心,主要特点包括:
- 声明式编程:关注”做什么”而非”怎么做”
- 异步非阻塞:I/O 操作不阻塞线程,提高资源利用率
- 事件驱动:基于事件触发数据流处理
- 背压支持:消费者可向上游反馈处理能力,防止数据溢出
2.2 与传统编程的对比
| 传统同步编程 |
响应式编程 |
| 线程阻塞等待 I/O |
线程非阻塞,可处理其他任务 |
| 命令式代码(步骤化) |
声明式代码(链式操作) |
| 需手动管理线程池 |
内置调度器管理线程 |
| 无背压机制,易过载 |
背压自动平衡生产/消费速度 |
三、Reactor 核心组件
Reactor 的核心是两个数据流类型:Flux和Mono,它们是所有响应式操作的基础。
3.1 Flux:处理 0 到 N 个元素
Flux代表一个异步序列,可发射:
- 0 个、1 个或多个元素
- 一个完成信号(onComplete)
- 一个错误信号(onError)
适用场景:集合处理、批量数据查询、消息队列消费等多元素场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import reactor.core.publisher.Flux; import java.util.Arrays; import java.util.List;
public class FluxDemo { public static void main(String[] args) { Flux<String> flux1 = Flux.just("Java", "Python", "Go");
List<Integer> numbers = Arrays.asList(1, 2, 3); Flux<Integer> flux2 = Flux.fromIterable(numbers);
Flux<Integer> flux3 = Flux.range(1, 5);
flux3.subscribe( value -> System.out.println("接收: " + value), error -> System.err.println("错误: " + error), () -> System.out.println("序列结束") ); } }
|
3.2 Mono:处理 0 或 1 个元素
Mono代表一个异步结果,可发射:
- 0 个或 1 个元素
- 一个完成信号(onComplete)
- 一个错误信号(onError)
适用场景:单个结果查询、HTTP 请求返回、异步任务结果等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import reactor.core.publisher.Mono;
public class MonoDemo { public static void main(String[] args) { Mono<String> mono1 = Mono.just("Hello Reactor");
Mono<Void> mono2 = Mono.empty();
Mono<Long> mono3 = Mono.fromSupplier(() -> { System.out.println("开始计算..."); return System.currentTimeMillis(); });
mono3.subscribe( result -> System.out.println("结果: " + result), error -> System.err.println("错误: " + error), () -> System.out.println("处理完成") ); } }
|
3.3 核心接口关系
Reactor 基于 Reactive Streams 规范,核心接口关系如下:
1 2 3 4 5
| Publisher (Flux/Mono) → 可被订阅的数据流 ↑ Subscriber → 订阅者,接收数据流 ↑ Subscription → 连接Publisher与Subscriber,用于背压控制
|
四、常用操作符详解
操作符是 Reactor 处理数据流的核心工具,掌握常用操作符是学好 Reactor 的关键。
4.1 创建型操作符
用于生成数据流:
| 操作符 |
功能 |
示例 |
just() |
直接发射元素 |
Flux.just(1,2,3) |
fromIterable() |
从集合创建 |
Flux.fromIterable(list) |
range() |
生成整数范围 |
Flux.range(1, 5) |
create() |
手动控制发射 |
Flux.create(sink -> { ... }) |
create()示例(整合回调 API):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Flux.create(sink -> { new CallbackApi() { @Override public void onData(String data) { sink.next(data); }
@Override public void onComplete() { sink.complete(); }
@Override public void onError(Throwable e) { sink.error(e); } }; });
|
4.2 转换型操作符
| 操作符 |
功能 |
注意事项 |
map() |
同步转换元素(1:1) |
适合轻量转换 |
flatMap() |
转换为新流(1:N) |
结果无序,适合异步转换 |
concatMap() |
转换为新流并顺序拼接 |
结果有序,性能略低 |
flatMap()示例:
1 2 3 4
| Flux.just(1L, 2L, 3L) .flatMap(userId -> userService.findById(userId)) .subscribe(user -> System.out.println(user.getName()));
|
4.3 过滤型操作符
1 2 3 4 5 6
| Flux.range(1, 10) .filter(i -> i % 2 == 0) .skip(2) .take(2) .distinct() .subscribe(System.out::println);
|
4.4 错误处理操作符
1 2 3 4 5 6 7
| Flux.just(1, 2) .concatWith(Flux.error(new RuntimeException("模拟错误"))) .onErrorResume(error -> { System.err.println("捕获错误: " + error.getMessage()); return Flux.just(3, 4); }) .subscribe(System.out::println);
|
常用错误处理操作符:
onErrorReturn():返回默认值
onErrorResume():切换到备用流
retry(3):重试 3 次
五、背压(Backpressure)机制
背压是 Reactor 的核心特性,用于解决”生产者速度远快于消费者”的问题。
5.1 工作原理
- 消费者通过
Subscription向上游发送request(n)信号,声明可处理 n 个元素
- 生产者根据请求数量调整发射速度
- 若生产者无法减速,可通过背压策略处理超额数据
5.2 背压策略
1 2 3 4
| Flux.interval(Duration.ofMillis(100)) .onBackpressureDrop(dropped -> System.out.println("丢弃: " + dropped)) .delayElements(Duration.ofMillis(300)) .subscribe(System.out::println);
|
常用策略:
onBackpressureBuffer():缓冲(默认,满则报错)
onBackpressureDrop():丢弃超额元素
onBackpressureLatest():保留最新元素
onBackpressureError():直接报错
六、线程调度(Schedulers)
Reactor 通过Schedulers管理线程,实现异步操作的线程切换。
6.1 常用调度器
| 调度器 |
特点 |
适用场景 |
immediate() |
当前线程 |
同步操作 |
single() |
单线程(可重用) |
轻量异步任务 |
boundedElastic() |
弹性线程池(有上限) |
I/O 密集型操作 |
parallel() |
固定大小线程池(CPU 核心数) |
CPU 密集型操作 |
6.2 线程切换方法
subscribeOn():指定生产者执行的线程(整个流的源头)
publishOn():指定后续操作的执行线程(影响下游)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.range(1, 3) .map(i -> { System.out.println("map线程: " + Thread.currentThread().getName()); return i * 2; }) .publishOn(Schedulers.boundedElastic()) .flatMap(i -> { System.out.println("flatMap线程: " + Thread.currentThread().getName()); return Mono.just(i + 1); }) .subscribeOn(Schedulers.parallel()) .subscribe(result -> { System.out.println("订阅线程: " + Thread.currentThread().getName()); });
|
七、Spring 生态中的应用
7.1 Spring WebFlux 控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RestController @RequestMapping("/users") public class UserController {
private final UserService userService;
public UserController(UserService userService) { this.userService = userService; }
@GetMapping public Flux<User> getAllUsers() { return userService.findAll(); }
@GetMapping("/{id}") public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) { return userService.findById(id) .map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.notFound().build()); } }
|
7.2 响应式数据访问(R2DBC)
1 2 3 4 5 6
| public interface UserRepository extends ReactiveCrudRepository<User, Long> { Flux<User> findByAgeGreaterThan(int age);
Mono<User> findByUsername(String username); }
|
八、测试 Reactor 代码
使用StepVerifier验证响应式流的行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import reactor.test.StepVerifier;
public class ReactorTest { @Test public void testFlux() { Flux<Integer> flux = Flux.just(1, 2, 3).map(i -> i * 2);
StepVerifier.create(flux) .expectNext(2) .expectNext(4) .expectNext(6) .verifyComplete(); } }
|