0
点赞
收藏
分享

微信扫一扫

RxJava 入门篇 (二) -- 关键的类

kmoon_b426 2022-09-14 阅读 172


文章目录

  • ​​1. Observable​​
  • ​​2. Observer​​
  • ​​3. 实现 Observable 和 Observer​​
  • ​​4. Subject​​
  • ​​4.1 PublishSubject​​
  • ​​4.2 ReplaySubject​​
  • ​​4.3 BehaviorSubject​​
  • ​​4.4 AsyncSubject​​
  • ​​5. 隐含的规则​​
  • ​​6. 代码示例​​
  • ​​6.1 创建 Observable 并发射数据​​
  • ​​6.2 创建 Observer 订阅 Observable 并接收数据:​​
  • ​​6.3 把 Subject 当作 Observer 传入 subscribe() 中​​


​Rx​​ 有两个最基础的类型,和其他一些扩展这两种类型的类。两个核心的类为:

​​Observable​​ 和

​​Observer​​。

​​Subject​​ 是同时继承了

​Observable​​ 和

​Observer​​。

​Rx​​​ 是在 ​​Observer​​​ 模式之上建立起来的。这种模式很常见,在 ​​Java​​​ 中有很多地方都使用了该模式,比如 ​​JavaFx​​​ 中的 ​​EventHandler​​​。 这些简单的使用方式和 ​​Rx​​ 对比有如下区别:

  • 使用​​event handler​​ 来处理事件很难组合使用
  • 无法延时处理查询事件
  • 可能会导致内存泄露
  • 没有标准的标示完成的方式
  • 需要手工的来处理并行和多线程

1. Observable

​Observable​​​ 是第一个核心类。该类包含了 ​​Rx​​ 中的很多实现,以及 所有核心的操作函数(operator or 操作符)。在本系列教程中会逐步介绍每个操作函数。现在我们只需要理解 ​​subscribe​​ 函数即可,下面是该函数的一种定义:

public final Subscription subscribe(Subscriber<? super T> subscriber)

该函数是用来接收 ​​observable​​​ 发射的事件的。当事件被发射后,他们就丢给了 ​​subscriber​​​, ​​subscriber​​​ 是用来处理事件的实现。这里的 ​​Subscriber​​​ 参数实现了 ​​Observer​​ 接口

一个 ​​Observable​​ 发射 三种类型的事件:

  • ​Values​​(数据)
  • ​完成状态​​​,告诉​​Subscriber​​ 事件(数据) 发射完毕,没有其他数据了
  • ​Error​​, 错误状态,如果在发射数据的过程中出现错误了,会发送该事件

2. Observer

Subscriber​Observer​ 的一个实现。 ​​Subscriber​​​ 实现了其它一些额外的功能,可以作为我们实现 ​​Observer​​​ 的基类。现在先看看 ​​Observer​​ 的接口定义:

interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}

每次 ​​Observable​​​ 发射事件的时候就会执行这三个对应的函数。​​Observer​​​ 的 ​​onNext​​​ 函数会被调用 ​​0​​​ 次或者 ​​多次​​​,然后会调用 ​​onCompleted​​​ 或者 ​​onError​​​。在 ​​onCompleted​​​ 或者 ​​onError​​ 发生以后就不会再有其他事件发射出来了。

在使用 ​​Rx​​​ 开发的过程中,你会看到很多 ​​Observable​​​,但是 ​​Observer​​​ 出场的时候很少。但是理解 ​​Observer​​​ 的概念是非常重要的,虽然有很多简写方式来帮助更加简洁的使用 ​​Observer​

3. 实现 Observable 和 Observer

你可以手工的实现 ​​Observer​​​ 或者扩展 ​​Observable​​​。 在真实场景中并不需要这样做,​​Rx​​​ 已经提供了很多可以直接使用的工厂方法了。使用 ​​Rx​​​ 提供的工具来创建 ​​Observable​​​ 和 ​​Observer​​ 比手工实现要更加安全和简洁。

要订阅到一个 ​​Observable​​​,并不需要提供一个 ​​Observer​​​ 示例。​​subscribe​​​ 函数有各种重载方法可以使用,你可以只订阅 ​​onNext​​​ 事件,有可以只订阅 ​​onError​​​ 事件,这样就不用提供 ​​Observer​​​ 对象就可以接受事件了。每次只需要提供你关心的函数即可,例如 如果你不关心 ​​onError​​​ 和 ​​onCompleted​​​,则只提供 ​​onNext​​ 来接收每次发送的数据即可。

配合 ​​Java 8 的 Lambda 表达式​​​则使用起来代码看起来会更加简洁,所以本系列示例代码会使用 ​​lambda​​​ 表达式,如果你不了解的话,可以先看看 ​​掌握 Java 8 Lambda 表达式​​。

4. Subject

官方文档:​​Subject​​ 可以看成是一个桥梁或者代理,在某些 ​​ReactiveX​​​ 实现中(如 ​​RxJava​​​),它同时充当了 ​​Observer​​​ 和 ​​Observable​​​ 的角色。因为它是一个 ​​Observer​​,它可以订阅一个或多个 Observable​​;又因为它是一个​​Observable​​,它可以转发它收到(Observe)的数据,也可以发射新的数据

​Subject​Observable 的一个扩展,同时还实现了 Observer 接口,总结就是:

Subject 可以像 ​Observer​ 一样接收事件,同时还可以像 ​Observable​ 一样把接收到的事件再发射出去

  • 它可以充当​​Observable​
  • 它可以充当​​Observer​
  • 是**​​Observable​​​ 和​​Observer​​ 之间的桥梁**

这种特性非常适合 ​​Rx​​​ 中的接入点,当你的事件来至于 ​​Rx​​​ 框架之外的代码的时候,你可以把这些数据先放到 ​​Subject​​​ 中,然后再把 ​​Subject​​​ 转换为一个 ​​Observable​​​,就可以在 ​​Rx​​ 中使用它们了。你可以把 Subject 当做 ​Rx​ 中的 事件管道

​Subject​​ 有两个参数类型:输入参数和输出参数。这样设计是为了抽象 而不是为了转换数据类型。转换数据应该使用转换操作函数来完成,后面我们将介绍各种操作函数。

  • AsyncSubject
  • BehaviorSubject
  • PublishSubject
  • ReplaySubject

4.1 PublishSubject

​PublishSubject​​​ 比较容易理解,相对比其他 ​​Subject​​​ 常用,它的 ​​Observer​​​ 只会接收到 ​​PublishSubject​​ 被订阅之后发送的数据。

示例代码如下:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject1");
publishSubject.onNext("publishSubject2");
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
System.out.println("onNext = " + s);
}
});
publishSubject.onNext("publishSubject3");
publishSubject.onNext("publishSubject4");

输出结果:

onNext = publishSubject3
onNext = publishSubject4

Another sample code:

PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.subscribe(System.out::println);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);

结果:

2
3
4

上面的 System.out::println 是 ​​Lambda 表达式中的函数引用​​,如果表达式代码块只有一个函数调用,则可以直接使用函数引用来简化代码

可以看到,数据 ​​1​​​ 并没有打印出来,原因是当我们订阅到 ​​subject​​​ 的时候,​​1​​​ 已经发射出去了。当订阅到 ​​subject​​​ 后就开始接收 发射到 ​​subject​​ 中的数据了。

这是我们初次看到如何使用 ​​subscribe​​​ 函数,值得详细研究下是如何用的。 这里我们使用了一个重载的参数只有一个 ​​Function​​​ 类型。这个参数 ​​Function​​​ 接收一个参数 ​​Integer​​​ 并且没有返回值。 没有返回值的 ​​Function​​​ 在 ​​Rx​​​ 中被称之为 ​​action​​​。 可以使用下面几种方式来提供这个 ​​Function​​:

  • 提供一个​​Action1​​ 的实现对象
  • 使用​​Lambda​​ 表达式 实现
  • 使用符合该接口定义类型的​​Lambda 表达式函数引用​​​。这里​​System.out::println​​​ 函数可以接受一个​​Object​​​ 对象,符合​​Action​​​ 的定义(​​接受一个参数并没有返回值​​​),所以我们可以把该函数作为函数引用使用。​​subscribe​​​ 将会使用他收到的值作为​​println​​​ 函数的参数来调用​​println​​ 函数

4.2 ReplaySubject

​ReplaySubject​​ 可以缓存所有发射给它的数据。当一个新的订阅者订阅的时候,缓存的所有数据都会发射给这个订阅者。 由于使用了缓存,所以每个订阅者都会收到所有的数据:

ReplaySubject<String> replaySubject = ReplaySubject.create();
// 创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
// replaySubject =
// ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
// replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据
// replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());
// replaySubject被订阅前的前1秒内发送的数据才能被接收
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("replaySubject:" + s);
}
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

结果

replaySubject:replaySubject:pre1
replaySubject:replaySubject:pre2
replaySubject:replaySubject:pre3
replaySubject:replaySubject:after1
replaySubject:replaySubject:after2

不管是何时订阅的,每个订阅者都收到了所有的数据。

Another sample code:

ReplaySubject<Integer> s = ReplaySubject.create();  
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(2);

Result:

Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2

缓存所有的数据并不是一个十分理想的情况,如果 ​​Observable​​​ 事件流运行很长时间,则缓存所有的数据会消耗很多内存。可以限制缓存数据的数量和时间。 ​​ReplaySubject.createWithSize​​​ 限制缓存多少个数据;而 ​​ReplaySubject.createWithTime​​ 限制一个数据可以在缓存中保留多长时间

ReplaySubject<Integer> s = ReplaySubject.createWithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

Result:

Late: 1
Late: 2
Late: 3

由于指定只缓存两个数据,所以当订阅的时候第一个数据 0 就收不到了。 限制缓存的时间也是一样的情况:

ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

Result;

Late: 1
Late: 2
Late: 3

使用时间缓存创建 ​​ReplaySubject​​​ 需要指定一个 ​​Scheduler​​​, ​​Scheduler​​​ 是 ​​Rx​​ 中保持时间的方式。现在可以假装它不存在,不用关心它。

​ReplaySubject.createWithTimeAndSize​​ 则可以同时限制时间和个数。

4.3 BehaviorSubject

​BehaviorSubject​​​ 会接收到​​BehaviorSubject​​​被订阅之前的最后一个数据,再接收其他发射过来的数据,如果​​BehaviorSubject​​被订阅之前没有发送任何数据,则会发送一个默认数据。

等同于限制 ReplaySubject 的个数为 1 的情况。在创建的时候可以指定一个初始值,这样可以确保党订阅者订阅的时候可以立刻收到一个值。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

Result:

Late: 2
Late: 3

下面的示例只是打印出 ​​Completed​​​, 由于最后一个事件就是 ​​Completed​​。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
v -> System.out.println("Late: " + v),
e -> System.out.println("Error"),
() -> System.out.println("Completed")
);

Result:

Completed

这里使用了 subscribe 函数的另外一种重载形式,接受三个参数。

下面使用了默认初始化值,如果订阅者的发射数据之前就订阅了,则会收到这个初始化的值:

BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);

Result:

0
1

由于 ​​BehaviorSubject​​​ 的定义就是总是有可用的数据,所以一般都会使用初始化值来创建 ​​BehaviorSubject​​.

完整示例代码如下:

BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
behaviorSubject.onNext("behaviorSubject1");
behaviorSubject.onNext("behaviorSubject2");
behaviorSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

LogUtil.log("behaviorSubject:complete");
}

@Override
public void onError(Throwable e) {

LogUtil.log("behaviorSubject:error");
}

@Override
public void onNext(String s) {

LogUtil.log("behaviorSubject:"+s);
}
});

behaviorSubject.onNext("behaviorSubject3");
behaviorSubject.onNext("behaviorSubject4");

结果

behaviorSubject2
behaviorSubject3
behaviorSubject4

如果在​​behaviorSubject.subscribe()​​​之前不发送​​behaviorSubject1​​​、​​behaviorSubject2​​​,则​​Observer​​​会先接收到​​default​​​,再接收​​behaviorSubject3​​​、​​behaviorSubject4​

注意跟​​AsyncSubject​​​的区别,​​AsyncSubject​​要手动调用​onCompleted()​​​,且它的​​Observer​​​会接收到**​​onCompleted()​​​前发送的最后一个数据,之后不会再接收数据**,而​​BehaviorSubject​​​不需手动调用​​onCompleted()​​​,它的​​Observer​​​接收的是​​BehaviorSubject​​被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。

4.4 AsyncSubject

​Observer​​​会接收​​AsyncSubject​​​的​​onComplete()​​​之前的最后一个数据,如果因异常而终止,​​AsyncSubject​​将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:

AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject1");
asyncSubject.onNext("asyncSubject2");
asyncSubject.onNext("asyncSubject3");
asyncSubject.onCompleted();
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

LogUtil.log("asyncSubject onCompleted"); //输出 asyncSubject onCompleted
}

@Override
public void onError(Throwable e) {

LogUtil.log("asyncSubject onError"); //不输出(异常才会输出)
}

@Override
public void onNext(String s) {

LogUtil.log("asyncSubject:"+s); //输出asyncSubject:asyncSubject3
}
});

输出结果:

asyncSubject3

以上代码,​​Observer​​​只会接收​​asyncSubject​​​的​​onCompleted()​​​被调用前的最后一个数据,即​​"asyncSubject3"​​​,如果不调用​​onCompleted()​​​,​​Subscriber​​将不接收任何数据。

Lambda 表达式 sample:

AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();

Result:

1

5. 隐含的规则

如果你把 ​​Subject​​​ 当作一个 ​​Subscriber​​ 使用,不要从多个线程中调用它的​​onNext​​方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给​Subject​的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 ​​Subject​​​ 转换为一个 ​​SerializedSubject​​ ,类似于这样:

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

​Rx​​ 中有一些隐含的规则在代码中并不太容易看到。一个重要的规则就是:

当一个事件流结束(onError 或者 ​onCompleted​ 都会导致事件流结束)后就不会发射任何数据了

这些 ​​Subject​​​ 的实现都遵守这个规则,​​subscribe​​ 函数也拒绝违反该规则的情况。

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);

结果

0

但是在 ​​Rx​​​ 实现中并没有完全确保这个规则,所以你在使用 ​​Rx​​ 的过程中要注意遵守该规则,否则会出现意料不到的情况。

6. 代码示例

6.1 创建 Observable 并发射数据

       // 1.创建被观察者 Observable
@SuppressWarnings("deprecation")
Observable<String> observable = Observable.create(new OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
// TODO Auto-generated method stub
subscriber.onNext("Hello World!"); // 发送数据
subscriber.onCompleted();// 最终调用该方法,表示结束
}
});

用 Subject 实现为

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();

6.2 创建 Observer 订阅 Observable 并接收数据:

       // 2.创建观察者Observer
rx.Observer<String> observer = new rx.Observer<String>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("onCompleted");
}

@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
System.out.println("onError");
}

@Override
public void onNext(String arg0) {
// TODO Auto-generated method stub
System.out.println("onNext = " + arg0);
}

};

// 3.被观察者订阅观察者
observable.
// 建议在这修改数据
map(new Func1<String, String>()
{
// 第一个参数决定 call方法类型,第二个参数决定返回值类型
@Override
public String call(String arg0) {
return arg0 + "汤圆";
}
}).subscribe(observer);

用Subject实现为:

publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});

6.3 把 Subject 当作 Observer 传入 subscribe() 中

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("as Observer");
subscriber.onCompleted();
}
}).subscribe(publishSubject);

有没有发现问题?​​publishSubject​​​没有重写​​onNext()​​​方法啊,在哪接收的数据?这就是前面说的 ​​桥梁​​​ 的问题了,尽管把 ​​Subject​​​ 作为 ​​Observer​​​ 传入​​subscribe()​​​,但接收数据还是要通过 ​​Observer​​​ 来接收,借用 ​​Subject​​​ 来连接​​Observable​​​ 和 ​​Observer​​,整体代码如下:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("as Bridge");
subscriber.onCompleted();
}
}).subscribe(publishSubject);

publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

LogUtil.log("subject:"+s); //接收到 as Bridge
}
});

这就是桥梁的意思!


举报

相关推荐

0 条评论