UnicastProcessor深度解析:单订阅者处理器的工作原理与实践 一、核心概念与设计目标 核心特性: 单订阅者保障 :严格限制只能存在一个订阅者,防止多订阅导致的背压失效 自定义背压队列 :内置可配置大小的队列实现(默认无界队列) 多线程安全 :支持多生产者线程并发推送数据 完成状态传播 :任一生产者完成时自动触发下游完成信号 典型应用场景: // 书籍示例:多线程数据收集器
UnicastProcessor < Integer > processor = UnicastProcessor . create ();
Flux . from ( processor )
. subscribe ( System . out :: println );
// 生产者线程1
new Thread (() -> {
for ( int i = 0 ; i < 10 ; i ++ ) {
processor . onNext ( i );
}
processor . onComplete ();
}). start ();
// 生产者线程2
new Thread (() -> {
processor . onNext ( 100 );
processor . onComplete ();
}). start ();
二、原理机制详解 关键设计思想 采用**生产者-消费者模型**实现数据流控制:
生产端 :通过原子变量维护订阅者数量 消费端 :使用定制化队列实现背压缓冲 状态同步 :借助序列化Sink保证线程安全 状态流转图 stateDiagram
[*] --> Idle
Idle --> Active: subscribe()
Active --> Draining: onNext()
Draining --> Completed: onComplete()
Draining --> Error: onError()
Completed/Error --> [*] 三、源码深度解析(Reactor 3.x实现) 核心类结构 public final class UnicastProcessor < T >
extends Processor < T , T >
implements Fuseable . QueueSubscription < T > {
private final Queue < T > queue ; // 底层背压队列
private final AtomicLong requested = new AtomicLong (); // 请求计数器
private final AtomicReference < Subscription > subscription = new AtomicReference <> ();
private final AtomicBoolean done = new AtomicBoolean (); // 完成状态标记
}
关键方法解读 1. 订阅控制逻辑 public void subscribe ( Subscriber <? super T > s ) {
if ( subscription . compareAndSet ( null , s )) {
s . onSubscribe ( new SerializedSink <> ( this ));
} else {
EmptySubscription . error ( s , new IllegalStateException ( "Only one subscriber allowed" ));
}
}
使用CAS操作保证单订阅者 通过SerializedSink实现线程安全的数据提交 2. 数据下发机制 public void onNext ( T t ) {
if ( done . get ()) {
Operators . onNextDropped ( t );
return ;
}
if ( queue . offer ( t )) {
drain ();
} else {
Operators . onNextDropped ( t );
}
}
无界队列模式下直接入队 有界队列满时触发onNextDropped 3. 序列化Sink实现 static final class SerializedSink < T > implements Sink < T > {
private final UnicastProcessor < T > parent ;
private final Queue < T > queue = Queues . < T > unbounded (). get ();
@Override
public void next ( T t ) {
queue . offer ( t );
parent . drain ();
}
// 错误/完成信号同步处理
@Override
public void error ( Throwable t ) {
parent . error ( t );
}
@Override
public void complete () {
parent . complete ();
}
}
通过内部队列实现线程安全的数据缓冲 drain()方法触发实际的数据下发 四、性能对比与适用场景 特性 UnicastProcessor DirectProcessor SynchronousSink 订阅者数量 严格限制1个 无限制 无限制 背压支持 完整支持 无背压 手动信号控制 内存占用 O(N)(队列大小) O(1) O(1) 典型场景 多线程数据聚合 简单数据转发 精确背压控制
五、实战示例 场景:多线程日志聚合 UnicastProcessor < LogEvent > processor = UnicastProcessor . create ();
Flux . from ( processor )
. bufferTimeout ( 100 , Duration . ofSeconds ( 5 ))
. subscribe ( batch -> {
logService . batchSave ( batch );
});
// 生产者线程池
ExecutorService executor = Executors . newFixedThreadPool ( 4 );
for ( int i = 0 ; i < 4 ; i ++ ) {
executor . submit (() -> {
for ( int j = 0 ; j < 100 ; j ++ ) {
processor . onNext ( new LogEvent ( "Thread-" + Thread . currentThread (). getId (), j ));
}
processor . onComplete ();
});
}
六、最佳实践建议 严格单订阅约束 :确保只存在一个下游订阅者 合理配置队列大小 :根据下游处理能力设置队列容量 及时处理完成信号 :避免内存泄漏 避免嵌套使用 :防止背压信号传递失效 七、与DirectProcessor的对比 特性 UnicastProcessor DirectProcessor 订阅者数量 1个 多个 背压实现 基于队列的回压 无背压 内存风险 队列溢出可能导致OOM 无缓冲,但可能压垮下游
2025年6月5日 02:46:30 2025年6月5日 02:46:30