0
点赞
收藏
分享

微信扫一扫

Android RxJava2.0 之常见的操作符

Sophia的玲珑阁 2021-09-19 阅读 75
RxJava2

1. 基本创建

create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

作用:
创建一个被观察者。

使用:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});

2. 快速创建 & 发送事件

2.1 just()

public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

作用:
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

使用:

Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

2.2 fromArray()

public static <T> Observable<T> fromArray(T... items)

作用:
这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

使用:

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

2.3 fromIterable()

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

作用:
直接发送一个 List 集合数据给观察者

使用:

List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

2.4 empty() & never() & error()

public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)

作用:

  • empty() : 直接发送 onComplete() 事件
  • never():不发送任何事件
  • error():发送 onError() 事件

使用:

Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "==================onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});

3. 延迟创建

3.1 defer()

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

作用:
这个方法的作用就是直到被观察者被订阅后才会创建被观察者。

使用:

// i 要定义为成员变量
Integer i = 100;
        
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

i = 200;

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);

3.2 timer()

public static Observable<Long> timer(long delay, TimeUnit unit) 
......

作用:
当到指定时间后就会发送一个 0L 的值给观察者。

使用:

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "===============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

3.3 interval()

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......

作用:
每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

使用:

Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

3.4 intervalRange()

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

作用:
可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

使用:

Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

3.5 range()

public static Observable<Integer> range(final int start, final int count)

作用:
同时发送一定范围的事件序列。

使用:

Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

3.6 rangeLong()

public static Observable<Long> rangeLong(long start, long count)

作用:
作用与 range() 一样,只是数据类型为 Long

使用:
用法与 range() 一样,这里就不再赘述了。

举报

相关推荐

0 条评论