经过两晚阅读RxJava源码的时间,终于把RxJava从观察者订阅被观察者、到被观察者发射订阅信息给观察者、到观察者解除订阅被观察者的信息,这一串基本流程通过流程图的形式体现了出来。所读源码版本是RxJava2.2.13,以下是流程图对应的示例代码。
/**
* rxjava2 流程示例代码
*/
private void rxJava2Flow() {
/**
* 被观察者的订阅,当观察者订阅了被观察者后,此回调将会被调用,发射器将会发送信号
*/
ObservableOnSubscribe<Integer> observableOnSubscribe = new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("已经订阅:subscribe,获取发射器");
emitter.onNext(1);
System.out.println("信号发射:"+1);
emitter.onNext(2);
System.out.println("信号发射:"+2);
emitter.onNext(3);
System.out.println("信号发射:"+3);
emitter.onNext(4);
System.out.println("信号发射:"+4);
emitter.onComplete();
System.out.println("信号发射:onComplete");
}
};
/**
* 创建被观察者,并带上被观察者的订阅
*/
Observable<Integer> observable = Observable.create(observableOnSubscribe);
/**
* 订阅解除器
*/
final Disposable[] disposable = new Disposable[1];
/**
* 创建观察者
*/
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
System.out.println("已经订阅:onSubscribe,获取解除器");
}
@Override
public void onNext(Integer integer) {
System.out.println("信号接收:onNext "+integer);
if (integer==2){
disposable[0].dispose();
System.out.println("解除订阅:dispose");
}
}
@Override
public void onError(Throwable e) {
System.out.println("信号接收:onError "+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("信号接收:onComplete");
}
};
/**
* 订阅
*/
System.out.println("开始订阅:subscribe");
observable.subscribe(observer);
}
1、订阅
2、发射
3、解除
∞、备注
转载请注明出处。