您现在的位置是:首页 >技术教程 >RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题网站首页技术教程

RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题

TechMix 2023-06-09 00:00:03
简介RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题

一、问题背景

    最近在崩溃后台发现了一些业务代码的crash记录,根据堆栈去定位代码调用位置时,发现是用的RxJava写的一个异步任务的逻辑,但是有在subscribe中链式调用subscribe(onSuccess,onError)。主要是在Activity#onResume时创建并启动一个耗时任务,onPause时将onResume时创建的Disposable任务进行dispose。

1.1 崩溃堆栈

RxJava崩溃堆栈

1.2 写demo代码复现相同逻辑

// 大概代码是这样子,仅仅是写了一段用来测试的代码,不用关注具体业务逻辑。
getLifecycle().addObserver(new LifecycleEventObserver() {
            @Override
            public void onStateChanged(@NonNull LifecycleOwner source, @NonNull Lifecycle.Event event) {
                // onResume时执行耗时任务,请求数据
                // onPause时dispose任务
                if (event == Lifecycle.Event.ON_RESUME) {
                    mDisposable = Single.create(new SingleOnSubscribe<String>() {
                                @Override
                                public void subscribe(SingleEmitter<String> emitter) throws Exception {
                                    if (mDisposable != null && mDisposable.isDisposed()) {
                                        Log.d(TAG, "subscribe: disposable=" + mDisposable + ", is disposed!");
                                        return;
                                    }
                                    boolean isSuccessful = mApi.loadData() != null;
                                    if (isSuccessful) {
                                        emitter.onSuccess("120");
                                    } else {
                                        // 传递异常
                                        emitter.onError(new IllegalStateException("process data error!"));
                                    }
                                }
                            })
                            // only for test!!
                            .flatMap(new Function<String, SingleSource<Integer>>() {
                                @Override
                                public SingleSource<Integer> apply(String s) throws Exception {
                                    return Single.just(Integer.parseInt(s));
                                }
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            // 这里对于onError是有处理的。
                            // !!如果写的是doOnError()则还是会导致外层崩溃。
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.d(TAG, "doOnSuccess accept: " + integer);
                                }
                            }, new Consumer<Throwable>() {
                                @Override
                                public void accept(Throwable throwable) throws Exception {
                                    Log.d(TAG, "error accept: " + throwable);
                                }
                            });

                } else if (event == Lifecycle.Event.ON_PAUSE) {
                    if (mDisposable != null && !mDisposable.isDisposed()) {
                        mDisposable.dispose();
                    }
                }
            }
        });

我按照跟实际业务逻辑场景写了上述原型代码,操作了很多次onResume和onPause之间切换,未复现该崩溃。

二、问题等价还原-复现

    经过对RxJava代码执行流程的分析,发现SingleOnSubscribe#subscribe中对SingleEmitter#onError->doOnError(Consumer)的调用是有条件,必须要当前的Single是非Disposed状态,判断逻辑如下:

2.1 代码位置:io.reactivex.internal.operators.single.SingleCreate.Emitter#onError

@Override
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}
@Override
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (get() != DisposableHelper.DISPOSED) {
        Disposable d = getAndSet(DisposableHelper.DISPOSED);
        // 只有当异步任务是非Disposed状态时才会转调到Single添加的onError()回调
        if (d != DisposableHelper.DISPOSED) {
            try {
                downstream.onError(t);
            } finally {
                if (d != null) {
                    d.dispose();
                }
            }
            return true;
        }
    }
    return false;
}

尝试把异步任务mApi.loadData()延迟2000ms后,频繁切换Activity的onResume和onPuase状态,崩溃复现。
在这里插入图片描述

  • 因为Single任务的状态时DISPOSED,所以tryOnError()返回false,走RxJavaPlugins.onError(),
    看下RxJavaPlugins.onError()的实现代码:
// io.reactivex.plugins.RxJavaPlugins#onError
public static void onError(@NonNull Throwable error) {
		// (1)
        Consumer<? super Throwable> f = errorHandler;

        if (error == null) {
            error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        } else {
            // (2) 
            if (!isBug(error)) {
                error = new UndeliverableException(error);
            }
        }

        if (f != null) {
            try {
            	// (3)
                f.accept(error);
                return;
            } catch (Throwable e) {
            	// (4) 这里的处理就比较有争议了,
                // Exceptions.throwIfFatal(e); TODO decide
                e.printStackTrace(); // NOPMD
                uncaught(e);
            }
        }
        // (5)
		// f为null,导致走到这里
        error.printStackTrace(); // NOPMD
        uncaught(error);
    }
  • 先看下uncaught()方法的实现逻辑:
static void uncaught(@NonNull Throwable error) {
   Thread currentThread = Thread.currentThread();
   UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler();
   handler.uncaughtException(currentThread, error);
}

天秀!居然直接获取了当前的UncaughtExceptionHandler然后转调uncaughtException,要知道这样会走应用的崩溃上报逻辑,即使是逻辑上书写的错误,也会导致崩溃上报(一般应用自定义的UncaughtExceptionHandler会弹出崩溃页面,并让用户确认是否上报崩溃日志,最后将进程kill掉)。

  • (1) 这里的errorHandler就是我们RxJavaPlugins.setErrorHandler传入的异常处理器。但是发生这个问题时,并未传入一个全局的异常处理器。
  • (2) 如果不是isBug中定义的Exception类型,如果不是isBug中定义的Exception类型。
    另外isBug()也需要关注下,共定义了6种Exception类型认为是bug,常见的IOException、FileNotFoundException这些并没有包含其中。
static boolean isBug(Throwable error) {
    // user forgot to add the onError handler in subscribe
    if (error instanceof OnErrorNotImplementedException) {
        return true;
    }
    // the sender didn't honor the request amount
    // it's either due to an operator bug or concurrent onNext
    if (error instanceof MissingBackpressureException) {
        return true;
    }
    // general protocol violations
    // it's either due to an operator bug or concurrent onNext
    if (error instanceof IllegalStateException) {
        return true;
    }
    // nulls are generally not allowed
    // likely an operator bug or missing null-check
    if (error instanceof NullPointerException) {
        return true;
    }
    // bad arguments, likely invalid user input
    if (error instanceof IllegalArgumentException) {
        return true;
    }
    // Crash while handling an exception
    if (error instanceof CompositeException) {
        return true;
    }
    // everything else is probably due to lifecycle limits
    return false;
}
  • (3) f即errorHandler对象,如果f不为空,并且accept()方法未抛出异常,那么本次Single异步任务状态时DISPOSED也不会发生崩溃。
  • (4) f.accept()方法:
        如果f.accept内直接将error抛出来,则除了这里会走一遍uncaught的处理逻辑,并且(5)代码位置又会上报一遍。这会导致同一个异常被上报两次,崩溃后台数据直接x2,如果上线了,那么你的稳定性-崩溃率这块数据可能会被影响了。
        所以f.accept()方法的实现很讲究了,如果真的需要再抛出异常,那么就需要换一种类型,其实最好限制下仅当开发环境下给抛出,让应用崩溃,促进bug尽早发现降低上线后崩溃风险。

三、修复方法

3.1 方案一:设置全局的errorHandler,需要这一条处理兜底,但不要滥用。

RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        // 如果是debug环境下,就让异常抛出去,尽早暴露问题。
        if (BuildConfig.DEBUG) {
            throw new RuntimeException(throwable);
        }
        // 这里打印log trace也要注意,太频繁会损耗性能,
        // 不能过分依靠这个全局的异常处理,
        // 尽量在自己的业务代码逻辑中处理完善
        Log.d(TAG, "RxJava error handler accept: " + Log.getStackTraceString(throwable));
    }
});

3.2 方案二:在异步任务执行完毕时判判异步任务是否是DISPOSED状态

在这里插入图片描述

四、反思RxJava的使用问题

4.1 任务已经dispose了,为什么还会走onError/onSuccess?

4.2 disposed状态下的任务,走到onError/onSuccess选择直接抛出异常,为什么这样设计?

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。