0
点赞
收藏
分享

微信扫一扫

彻底搞清楚 RxJava 是什么东西


其实从rxjava14年出现到现在,我是去年从一个朋友那里听到的,特别是随着现在app项目越来越大,分层越来越不明确的情况下,rxjava出现了,以至于出现了rxandroid。其实如果你了解观察者模式的话,rxjava并没有你说的那么神秘。再次,我对rxjava并不崇拜,我的原则是怎么写代码简单,代码结构清晰,维护简单,就是好框架。

讲rxjava之前首先说一下Android mvp开发模式。

MVP的工作流程

  • Presenter负责逻辑的处理,
  • Model提供数据,
  • View负责显示。
    作为一种新的模式,在MVP中View并不直接使用Model,它们之间的通信是通过Presenter来进行的,所有的交互都发生在Presenter内部,而在MVC中View会从直接Model中读取数据而不是通过 Controller。

彻底搞清楚 RxJava 是什么东西_java


接下来说说rxjava


  • ​​RxJava 到底是什么​​
  • ​​RxJava 好在哪​​
  • ​​API 介绍和原理简析​​
  • ​​1. 概念:扩展的观察者模式​​
  • ​​观察者模式​​
  • ​​RxJava 的观察者模式​​
  • ​​2. 基本实现​​
  • ​​1) 创建 Observer​​
  • ​​2) 创建 Observable​​
  • ​​3) Subscribe (订阅)​​
  • ​​4) 场景示例​​
  • ​​a. 打印字符串数组​​
  • ​​b. 由 id 取得图片并显示​​
  • ​​3. 线程控制 —— Scheduler (一)​​
  • ​​1) Scheduler 的 API (一)​​
  • ​​2) Scheduler 的原理 (一)​​
  • ​​4. 变换​​
  • ​​1) API​​
  • ​​2) 变换的原理:lift()​​
  • ​​3) compose: 对 Observable 整体的变换​​
  • ​​5. 线程控制:Scheduler (二)​​
  • ​​1) Scheduler 的 API (二)​​
  • ​​2) Scheduler 的原理(二)​​
  • ​​3) 延伸:doOnSubscribe()​​
  • ​​RxJava 的适用场景和使用方式​​
  • ​​1. 与 Retrofit 的结合​​
  • ​​2. RxBinding​​
  • ​​3. 各种异步操作​​
  • ​​4. RxBus​​
  • ​​最后​​
  • ​​关于作者:​​
  • ​​为什么写这个?​​

如果你要了解rxjava是什么,由来,以及作用和原理,请点击上面的链接。

针对上面的问题,我们简单的了解下一些基本的概念。

什么是rxJava


一种帮助你做异步的框架. 类似于 AsyncTask. 但其灵活性和扩展性远远强于前者. 从能力上讲, 如果说 AsycnTask 是 DOS 操作系统, RxJava 是 Window 操作系统。


rxJava的好处


异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的​​AsyncTask​​ 和​​Handler​​ ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。




看下rxjava的例子

彻底搞清楚 RxJava 是什么东西_java_02


rxjava原理简析


我想大家听说过如下Java的都知道如下Java采用的是一种扩展的观察者模式实现的,何为观察者模式:观察者模式是一种一对多的依赖关系,当一个对象改变状态时,它会通知所有依赖者接受通知,并决定数据是否改变。




但是rxjava和传统的观察者模式又不完全相同,传统的观察者模式是涉及到两个对象观察者(​​Observer​​ )和被观察者(​​Observable​​ )。观察者通过将被观察 的对象加到自己的观察队列中,当被观察者发生改变时,就会通知观察者东西已经改变。

而rxJava中涉及到4个概念:​​Observable​​ (可观察者,即被观察者)、 ​​Observer​​ (观察者)、 ​​subscribe​​ (订阅)、事件。​​Observable​​ 和​​Observer​​ 通过 ​​subscribe()​​ 方法实现订阅关系,从而 ​​Observable​​ 可以在需要的时候发出事件来通知 ​​Observer​​​​数据刷新。​

注意:重点来了 


与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 ​​onNext()​​ (相当于 ​​onClick()​​ / ​​onEvent()​​)之外,还定义了两个特殊的事件:​​onCompleted()​

 和 ​​onError()​​。

​onCompleted()​

  • : 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的

​onNext()​

  • 发出时,需要触发

​onCompleted()​​​​onError()​

  • : 事件队列异常。在事件处理过程中出异常时,

​onError()​​ 注意:

在一个正确运行的事件序列中, ​​onCompleted()​​ 和 ​​onError()​​​​onCompleted()​​ 和 ​​onError()​​ 二者也是互斥的。在响应的队列中只能调用一个。

rxjava事件处理的模型图:


彻底搞清楚 RxJava 是什么东西_java_03


rxjava的基本实现


1) 创建 Observer(被观察者对象)



//Observable部分,被观察者部分
Observable<String> myObservable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("我是被观察的对象");
subscriber.onCompleted();
}
});

2) 创建Subscriber(观察者对象)



//Subscriber部分,观察者部分
Subscriber<String> mySubscriber=new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
text.setText(s);
}
};

3) Observer和Subscriber关联



myObservable.subscribe(mySubscriber);


这样就完成了一个简单的rxjava,是不是很简单。


注意:如果你用的是android studio作为ide工具的话,请务必添加rxjava依赖



彻底搞清楚 RxJava 是什么东西_ide_04



除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出Subscriber 。



其实如果看上面的写法,代码是显得比较难看的,这是为了方便大家理解rxjava的订阅者模式。



其实上面的代码可以这么写:



Observable.just("Hello, world!")  
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});

使用java8的lambda可以使代码更简洁

Observable.just("Hello, world!")  
.subscribe(s -> System.out.println(s));



然而如果你认为rxjava只有这个用处,那么也什么牛逼的,

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: ​​Scheduler​​ 。


 Scheduler (线程调度器)



线程控制与调度


 RxJava 遵循的是线程不变的原则,即:在哪个线程调用 

​subscribe()​

,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。而如果要实现线程的调度,就需要scheduler(线程调度器)。


RxJava 已经内置了几个 ​​Scheduler​​ ,它们已经适合大多数的使用场景:

​Schedulers.immediate()​

  • : 直接在当前线程运行,相当于不指定线程。这是默认的

​Scheduler​

​Schedulers.newThread()​

  • : 总是启用新线程,并在新线程执行操作。

​Schedulers.io()​

  • : I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的

​Scheduler​

  • 。行为模式和

​newThread()​

  • 差不多,区别在于

​io()​

  • 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下

​io()​

​newThread()​

  • 更有效率。不要把计算工作放在

​io()​​​​Schedulers.computation()​

  • : 计算所使用的

​Scheduler​

  • 。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个

​Scheduler​

  • 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在

​computation()​

  • 另外, Android 还有一个专用的

​AndroidSchedulers.mainThread()​

  • ,它指定的操作将在 Android 主线程运行。

Sceeduler默认给我们提供了​​subscribeOn()​​ 和 ​​observeOn()​​ 两个方法来对线程进行控制


举个例子:



Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
}
});


上面这段代码中,由于 

​subscribeOn(Schedulers.io())​

 的指定,被创建的事件的内容  ​​1​

、 ​​2​

、 ​​3​

、 ​​4​

 将会在 IO 线程发出;而由于

​observeOn(AndroidScheculers.mainThread()​​) 的指定,因此 ​​subscriber​​ 数字的打印将发生在主线程 。事实上,这种在 ​​subscribe()​​ 

之前写上两句 ​​subscribeOn(Scheduler.io())​​ 和 ​​observeOn(AndroidSchedulers.mainThread())​​ 的使用方式非常常见,


它适用于多数的 『后台线程取数据,主线程显示』的程序策略。





说到这里,有一个常用的场景:加载几十个图片到UI上,这里说说rxjava的写法



int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});


这样,加载图片发生在UI线程,而设置显示放到子线程出来,这样就不会出现卡顿。



变换



RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。



来看一个例子:



Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});

这里出现了一个  ​​Func1​

 的类。它和  ​​Action1​

 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。  ​​Func1​

 和  ​​Action​

的区别在于,  ​​Func1​

 包装的是有返回值的方法。 ​​FuncX​

 和 ​​ActionX​

 的区别在  ​​FuncX​

 包装的是有返回值的方法。

通过上面的代码我们看到:

​map()​

 方法将参数中的  ​​String​

 对象转换成一个  ​​Bitmap​

 对象后返回,而在经过  ​​map()​

 方法后,事件的参数类型也由  ​​String​

转为了  ​​Bitmap。这就是最长久的转换。​

​map(): 事件对象的直接变换示意图:​

彻底搞清楚 RxJava 是什么东西_ide_05

​flatMap(): 这是一个很有用但非常难理解的变换​

​首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的属性(我选择属性,是因为如果对象可以打印,你们单个属性肯定不是问题)。​




Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);

写法也很简单,看得也很明白。



flatmap运行原理图:




彻底搞清楚 RxJava 是什么东西_java_06


变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法:​​lift(Operator)​​。首先看一下 ​​lift()​​ 的内部实现(仅核心代码):


// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}








举报

相关推荐

0 条评论