1. 基本创建
create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
作用:
创建一个被观察者。
使用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello Observer");
e.onComplete();
}
});
2. 快速创建 & 发送事件
2.1 just()
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
作用:
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
使用:
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
2.2 fromArray()
public static <T> Observable<T> fromArray(T... items)
作用:
这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。
使用:
Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
2.3 fromIterable()
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
作用:
直接发送一个 List 集合数据给观察者
使用:
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
2.4 empty() & never() & error()
public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
作用:
- empty() : 直接发送 onComplete() 事件
- never():不发送任何事件
- error():发送 onError() 事件
使用:
Observable.empty()
.subscribe(new Observer < Object > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "==================onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete");
}
});
3. 延迟创建
3.1 defer()
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
作用:
这个方法的作用就是直到被观察者被订阅后才会创建被观察者。
使用:
// i 要定义为成员变量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 200;
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
3.2 timer()
public static Observable<Long> timer(long delay, TimeUnit unit)
......
作用:
当到指定时间后就会发送一个 0L 的值给观察者。
使用:
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "===============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3.3 interval()
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......
作用:
每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。
使用:
Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3.4 intervalRange()
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
作用:
可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。
使用:
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3.5 range()
public static Observable<Integer> range(final int start, final int count)
作用:
同时发送一定范围的事件序列。
使用:
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Integer aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3.6 rangeLong()
public static Observable<Long> rangeLong(long start, long count)
作用:
作用与 range() 一样,只是数据类型为 Long
使用:
用法与 range() 一样,这里就不再赘述了。