0
点赞
收藏
分享

微信扫一扫

【RxJava】just、from方法的基本原理

陆公子521 2022-02-25 阅读 86

前面的关于create的文章已经介绍了,Observable被订阅时的流程。
接下来看一看just的源码。

just有多个重载方法,内部的实现有两种方式:

  • 通过ScalarSynchronousObservable
  • 间接调用from方法

ScalarSynchronousObservable

public static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

前面的文章介绍,最终会调用OnSubscribe的call方法,从下面的代码看到使用的是子类JustOnSubscribe,所以我们直接去看JustOnSubscribe。

public final class ScalarSynchronousObservable<T> extends Observable<T>{
	protected ScalarSynchronousObservable(final T t) {
	    super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
	    this.t = t;
	}
}

在JustOnSubscribe的call方法,调用了Subscriber的s.setProducer方法。

static final class JustOnSubscribe<T> implements OnSubscribe<T> {
    final T value;

    JustOnSubscribe(T value) {
        this.value = value;
    }

    @Override
    public void call(Subscriber<? super T> s) {
        s.setProducer(createProducer(s, value));
    }
}

public void setProducer(Producer p) {
   long toRequest;
   ...
   ...
   内部最终调用的是Producer的request方法
   producer.request(Long.MAX_VALUE);
}

static <T> Producer createProducer(Subscriber<? super T> s, T v) {
    if (STRONG_MODE) {
        return new SingleProducer<T>(s, v);
    }
    return new WeakSingleProducer<T>(s, v);
}

最后追踪到WeakSingleProducer,可见其中的request方法调用的Subscriber的onNext和onComplete方法。

static final class WeakSingleProducer<T> implements Producer {
    final Subscriber<? super T> actual;
    final T value;
    boolean once;

    public WeakSingleProducer(Subscriber<? super T> actual, T value) {
        this.actual = actual;
        this.value = value;
    }

    @Override
    public void request(long n) {
		...
        T v = value;
        Subscriber<? super T> a = actual;
        ...
        a.onNext(v);
        a.onCompleted();
    }
}

总结

回顾create方法的使用:

Observable.create(new Observable.OnSubscribe<Integer>() {
     @Override
     public void call(Subscriber<? super Integer> subscriber) {
         subscriber.onNext(1);
         subscriber.onCompleted();
     }
 })

from

public static <T> Observable<T> just(T t1, T t2) {
    return from((T[])new Object[] { t1, t2 });
}
public static <T> Observable<T> from(T[] array) {
    return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
}
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {

    final Subscriber<? super T> child;
    final T[] array;

    int index;

    public FromArrayProducer(Subscriber<? super T> child, T[] array) {
        this.child = child;
        this.array = array;
    }

    @Override
    public void request(long n) {

        if (n == Long.MAX_VALUE) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                fastPath();
            }
        } else
        if (n != 0) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                slowPath(n);
            }
        }
    }

    void fastPath() {
		省略部分代码
	          child.onNext(t);
		省略部分代码
	          child.onCompleted();
    }

    void slowPath(long r) {
    	省略部分代码
	          child.onNext(t);
		省略部分代码
	          child.onCompleted();
    }
}
举报

相关推荐

0 条评论