UnicastProcessor深度解析:单订阅者处理器的工作原理与实践
一、核心概念与设计目标
核心特性:
- 单订阅者保障:严格限制只能存在一个订阅者,防止多订阅导致的背压失效
- 自定义背压队列:内置可配置大小的队列实现(默认无界队列)
- 多线程安全:支持多生产者线程并发推送数据
- 完成状态传播:任一生产者完成时自动触发下游完成信号
典型应用场景:
// 书籍示例:多线程数据收集器
UnicastProcessor<Integer> processor = UnicastProcessor.create();
Flux.from(processor)
.subscribe(System.out::println);
// 生产者线程1
new Thread(() -> {
for (int i = 0; i < 10; i++) {
processor.onNext(i);
}
processor.onComplete();
}).start();
// 生产者线程2
new Thread(() -> {
processor.onNext(100);
processor.onComplete();
}).start();
二、原理机制详解
关键设计思想
采用**生产者-消费者模型**实现数据流控制:
- 生产端:通过原子变量维护订阅者数量
- 消费端:使用定制化队列实现背压缓冲
- 状态同步:借助序列化Sink保证线程安全
状态流转图
stateDiagram
[*] --> Idle
Idle --> Active: subscribe()
Active --> Draining: onNext()
Draining --> Completed: onComplete()
Draining --> Error: onError()
Completed/Error --> [*]
三、源码深度解析(Reactor 3.x实现)
核心类结构
public final class UnicastProcessor<T>
extends Processor<T, T>
implements Fuseable.QueueSubscription<T> {
private final Queue<T> queue; // 底层背压队列
private final AtomicLong requested = new AtomicLong(); // 请求计数器
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
private final AtomicBoolean done = new AtomicBoolean(); // 完成状态标记
}
关键方法解读
1. 订阅控制逻辑
public void subscribe(Subscriber<? super T> s) {
if (subscription.compareAndSet(null, s)) {
s.onSubscribe(new SerializedSink<>(this));
} else {
EmptySubscription.error(s, new IllegalStateException("Only one subscriber allowed"));
}
}
- 使用CAS操作保证单订阅者
- 通过SerializedSink实现线程安全的数据提交
2. 数据下发机制
public void onNext(T t) {
if (done.get()) {
Operators.onNextDropped(t);
return;
}
if (queue.offer(t)) {
drain();
} else {
Operators.onNextDropped(t);
}
}
- 无界队列模式下直接入队
- 有界队列满时触发onNextDropped
3. 序列化Sink实现
static final class SerializedSink<T> implements Sink<T> {
private final UnicastProcessor<T> parent;
private final Queue<T> queue = Queues.<T>unbounded().get();
@Override
public void next(T t) {
queue.offer(t);
parent.drain();
}
// 错误/完成信号同步处理
@Override
public void error(Throwable t) {
parent.error(t);
}
@Override
public void complete() {
parent.complete();
}
}
- 通过内部队列实现线程安全的数据缓冲
- drain()方法触发实际的数据下发
四、性能对比与适用场景
特性 |
UnicastProcessor |
DirectProcessor |
SynchronousSink |
订阅者数量 |
严格限制1个 |
无限制 |
无限制 |
背压支持 |
完整支持 |
无背压 |
手动信号控制 |
内存占用 |
O(N)(队列大小) |
O(1) |
O(1) |
典型场景 |
多线程数据聚合 |
简单数据转发 |
精确背压控制 |
五、实战示例
场景:多线程日志聚合
UnicastProcessor<LogEvent> processor = UnicastProcessor.create();
Flux.from(processor)
.bufferTimeout(100, Duration.ofSeconds(5))
.subscribe(batch -> {
logService.batchSave(batch);
});
// 生产者线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
processor.onNext(new LogEvent("Thread-" + Thread.currentThread().getId(), j));
}
processor.onComplete();
});
}
六、最佳实践建议
- 严格单订阅约束:确保只存在一个下游订阅者
- 合理配置队列大小:根据下游处理能力设置队列容量
- 及时处理完成信号:避免内存泄漏
- 避免嵌套使用:防止背压信号传递失效
七、与DirectProcessor的对比
特性 |
UnicastProcessor |
DirectProcessor |
订阅者数量 |
1个 |
多个 |
背压实现 |
基于队列的回压 |
无背压 |
内存风险 |
队列溢出可能导致OOM |
无缓冲,但可能压垮下游 |