public static <T> T requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; }
3.方法第三行中的看new FlowableJust(item)创建的实例对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public final class FlowableJust<T> extends Flowable<T> implements ScalarCallable<T> { private final T value; //final变量,赋值后不能改变 public FlowableJust(final T value) { this.value = value; } //重写了Flowable的subscribeActual方法,传入了观察者与发射内容构造一个订阅对象 //由观察者去订阅这个对象 @Override protected void subscribeActual(Subscriber<? super T> s) { s.onSubscribe(new ScalarSubscription<T>(s, value)); }
@Override public T call() { return value; } }
4.然后看RxJavaPlugins.onAssembly()方法
1 2 3 4 5 6 7 8 9 10
@SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) { //在这里钩子函数为null,即直接返回source Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly; if (f != null) { return apply(f, source); } return source; }
public interface Consumer<T> { /** * 回调函数accept去消费这个传入值, * Consume the given value. * @param t the value * @throws Exception on error */ void accept(T t) throws Exception; }
6.接着看subscribe()方法
1 2 3 4 5 6 7 8 9
@CheckReturnValue @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext) { //onNext即要去消费的接口对象 //其他传入参数为默认的onError,onComplete接口回调,和最大数量的订阅对象 return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); }
7.然后去看第二个subscribe()内部方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//上面一系列的判空操作之后,将传入参数组合成一个LambdaSubscriber LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
@BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) @Beta public final void subscribe(FlowableSubscriber<? super T> s) { ObjectHelper.requireNonNull(s, "s is null"); try { // 传入当前要观察的对象和处理方法对象构建了一个观察者对象 Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
//在这传入一个观察者对象 subscribeActual(z); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Subscription has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }