前言
目前 RxJava 最新版本是 V3.0.11。
 GitHub 地址: RxJava
 RxJava2 有以下三个基本的元素:
- 被观察者(Observable)
- 观察者(Observer)
- 订阅(subscribe)
基本使用
本文还是使用 RxJava2.0 ,先添加 gradle 配置:
//RxJava
    implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'基本使用:
// 1. 创建一个 Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });
        // 2. 创建一个 Observer
        Observer<Integer> observer = new Observer<Integer>() {
            
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }
            
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
            }
            
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }
            
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        //3. 建立连接
        observable.subscribe(observer);运行的结果:
2021-03-27 14:36:20.817 15995-15995/com.hk.test D/MainActivity: onSubscribe: 
2021-03-27 14:36:20.817 15995-15995/com.hk.test D/MainActivity: onNext: 1
2021-03-27 14:36:20.818 15995-15995/com.hk.test D/MainActivity: onNext: 2
2021-03-27 14:36:20.818 15995-15995/com.hk.test D/MainActivity: onNext: 3
2021-03-27 14:36:20.818 15995-15995/com.hk.test D/MainActivity: onComplete:注意: 只有当被观察者和观察者建立连接之后, 被观察者才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件。
把这段代码连起来就成了 Rxjava 引以为傲的链式操作:
Observable.create(new ObservableOnSubscribe<Integer>() {
            
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }
            
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
            }
            
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }
            
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });- onNext()
 发送该事件时,观察者会回调 onNext() 方法
- onError()
 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
- onComplete()
 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送
1)每隔1小时执行任务
Observable.interval(0, 60 * 60 * 1000, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .subscribe(new Observer<Long>() {
                    
                    public void onSubscribe( Disposable disposable) {
                    }
                    
                    public void onNext( Long number) {
                        initIntentionData();
                        initGame();
                    }
                    
                    public void onError( Throwable e) {
                    }
                    
                    public void onComplete() {
                    }
                });- 子线程执行耗时操作
Observable.just("start")
                .map(new Function<String, ArrayList<CardInfoBean>>() {
                    
                    public ArrayList<CardInfoBean> apply(String table) throws Throwable {
                        ArrayList<CardInfoBean> infoBeans = CardSQLiteManager.getInstance(AIEHomeApplication.getAppContext())
                                .queryShowCard(CardSQLiteManager.TableNameConst.CARD_TABLE, 1);
                        return infoBeans;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ArrayList<CardInfoBean>>() {
                    
                    public void accept(ArrayList<CardInfoBean> cardInfos) throws Throwable {
                        mData = cardInfos;
                    }
                });小结
其实可以把 RxJava 比喻成一个做果汁,家里有很多种水果(要发送的原始数据),你想榨点水果汁喝一下,这时候你就要想究竟要喝什么水果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。
                
                










