您现在的位置是:首页 >技术教程 >RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题网站首页技术教程
RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题
简介RxJava中DISPOSED状态的被观察者任务执行onError/onSuccess导致的崩溃问题
RxJava中写了doOnError但还是导致应用崩溃问题记录
一、问题背景
最近在崩溃后台发现了一些业务代码的crash记录,根据堆栈去定位代码调用位置时,发现是用的RxJava写的一个异步任务的逻辑,但是有在subscribe中链式调用subscribe(onSuccess,onError)。主要是在Activity#onResume时创建并启动一个耗时任务,onPause时将onResume时创建的Disposable任务进行dispose。
1.1 崩溃堆栈
1.2 写demo代码复现相同逻辑
// 大概代码是这样子,仅仅是写了一段用来测试的代码,不用关注具体业务逻辑。
getLifecycle().addObserver(new LifecycleEventObserver() {
@Override
public void onStateChanged(@NonNull LifecycleOwner source, @NonNull Lifecycle.Event event) {
// onResume时执行耗时任务,请求数据
// onPause时dispose任务
if (event == Lifecycle.Event.ON_RESUME) {
mDisposable = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
if (mDisposable != null && mDisposable.isDisposed()) {
Log.d(TAG, "subscribe: disposable=" + mDisposable + ", is disposed!");
return;
}
boolean isSuccessful = mApi.loadData() != null;
if (isSuccessful) {
emitter.onSuccess("120");
} else {
// 传递异常
emitter.onError(new IllegalStateException("process data error!"));
}
}
})
// only for test!!
.flatMap(new Function<String, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(String s) throws Exception {
return Single.just(Integer.parseInt(s));
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// 这里对于onError是有处理的。
// !!如果写的是doOnError()则还是会导致外层崩溃。
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnSuccess accept: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "error accept: " + throwable);
}
});
} else if (event == Lifecycle.Event.ON_PAUSE) {
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
});
我按照跟实际业务逻辑场景写了上述原型代码,操作了很多次onResume和onPause之间切换,未复现该崩溃。
二、问题等价还原-复现
经过对RxJava代码执行流程的分析,发现SingleOnSubscribe#subscribe中对SingleEmitter#onError->doOnError(Consumer)的调用是有条件,必须要当前的Single是非Disposed状态,判断逻辑如下:
2.1 代码位置:io.reactivex.internal.operators.single.SingleCreate.Emitter#onError
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (get() != DisposableHelper.DISPOSED) {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
// 只有当异步任务是非Disposed状态时才会转调到Single添加的onError()回调
if (d != DisposableHelper.DISPOSED) {
try {
downstream.onError(t);
} finally {
if (d != null) {
d.dispose();
}
}
return true;
}
}
return false;
}
尝试把异步任务mApi.loadData()延迟2000ms后,频繁切换Activity的onResume和onPuase状态,崩溃复现。
- 因为Single任务的状态时DISPOSED,所以tryOnError()返回false,走RxJavaPlugins.onError(),
看下RxJavaPlugins.onError()的实现代码:
// io.reactivex.plugins.RxJavaPlugins#onError
public static void onError(@NonNull Throwable error) {
// (1)
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
// (2)
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}
if (f != null) {
try {
// (3)
f.accept(error);
return;
} catch (Throwable e) {
// (4) 这里的处理就比较有争议了,
// Exceptions.throwIfFatal(e); TODO decide
e.printStackTrace(); // NOPMD
uncaught(e);
}
}
// (5)
// f为null,导致走到这里
error.printStackTrace(); // NOPMD
uncaught(error);
}
- 先看下uncaught()方法的实现逻辑:
static void uncaught(@NonNull Throwable error) {
Thread currentThread = Thread.currentThread();
UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler();
handler.uncaughtException(currentThread, error);
}
天秀!居然直接获取了当前的UncaughtExceptionHandler然后转调uncaughtException,要知道这样会走应用的崩溃上报逻辑,即使是逻辑上书写的错误,也会导致崩溃上报(一般应用自定义的UncaughtExceptionHandler会弹出崩溃页面,并让用户确认是否上报崩溃日志,最后将进程kill掉)。
- (1) 这里的errorHandler就是我们RxJavaPlugins.setErrorHandler传入的异常处理器。但是发生这个问题时,并未传入一个全局的异常处理器。
- (2) 如果不是isBug中定义的Exception类型,如果不是isBug中定义的Exception类型。
另外isBug()也需要关注下,共定义了6种Exception类型认为是bug,常见的IOException、FileNotFoundException这些并没有包含其中。
static boolean isBug(Throwable error) {
// user forgot to add the onError handler in subscribe
if (error instanceof OnErrorNotImplementedException) {
return true;
}
// the sender didn't honor the request amount
// it's either due to an operator bug or concurrent onNext
if (error instanceof MissingBackpressureException) {
return true;
}
// general protocol violations
// it's either due to an operator bug or concurrent onNext
if (error instanceof IllegalStateException) {
return true;
}
// nulls are generally not allowed
// likely an operator bug or missing null-check
if (error instanceof NullPointerException) {
return true;
}
// bad arguments, likely invalid user input
if (error instanceof IllegalArgumentException) {
return true;
}
// Crash while handling an exception
if (error instanceof CompositeException) {
return true;
}
// everything else is probably due to lifecycle limits
return false;
}
- (3) f即errorHandler对象,如果f不为空,并且accept()方法未抛出异常,那么本次Single异步任务状态时DISPOSED也不会发生崩溃。
- (4) f.accept()方法:
如果f.accept内直接将error抛出来,则除了这里会走一遍uncaught的处理逻辑,并且(5)代码位置又会上报一遍。这会导致同一个异常被上报两次,崩溃后台数据直接x2,如果上线了,那么你的稳定性-崩溃率这块数据可能会被影响了。
所以f.accept()方法的实现很讲究了,如果真的需要再抛出异常,那么就需要换一种类型,其实最好限制下仅当开发环境下给抛出,让应用崩溃,促进bug尽早发现降低上线后崩溃风险。
三、修复方法
3.1 方案一:设置全局的errorHandler,需要这一条处理兜底,但不要滥用。
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 如果是debug环境下,就让异常抛出去,尽早暴露问题。
if (BuildConfig.DEBUG) {
throw new RuntimeException(throwable);
}
// 这里打印log trace也要注意,太频繁会损耗性能,
// 不能过分依靠这个全局的异常处理,
// 尽量在自己的业务代码逻辑中处理完善
Log.d(TAG, "RxJava error handler accept: " + Log.getStackTraceString(throwable));
}
});
3.2 方案二:在异步任务执行完毕时判判异步任务是否是DISPOSED状态
四、反思RxJava的使用问题
4.1 任务已经dispose了,为什么还会走onError/onSuccess?
4.2 disposed状态下的任务,走到onError/onSuccess选择直接抛出异常,为什么这样设计?
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。