您现在的位置是:首页 >技术交流 >一.RxJava网站首页技术交流
一.RxJava
1.RxJava使用场景
RxJava核心思想
Rx思维:响应式编程,从起点到终点,中途不能断掉,并且可以在中途添加拦截.
生活中的例子:
起点(分发事件,我饿了)->下楼->去餐厅->点餐->终点(吃饭,消费事件)
程序中的例子:
起点(分发事件,点击登录)->登录API->请求服务器->获取响应码->终点(更新UI登录成功,消费事件)
总结:
有一个起点和一个终点,起点开始流向我们的“事件”,把事件流向终点,只不过在流向终点的过程中,可以增加拦截,拦截时可以对"事件进行改变",终点只关心他的上一个拦截.
Retrofit配合RxJava使用
Retrofit是对OkHttp网络请求框架的封装,我们将从OkHttp请求到数据的响应给到RxJava进行处理.
防抖
作用:防止重复操作.
举例1:
防止用户一直去请求获取验证码接口,黑客攻击1s内请求100次获取验证码接口;但是我们可以利用防抖思想,对其进行拦截,让他100次只做第一次处理,甚至一天之内最多只能请求5次获取验证码接口.
举例2:
我们点击某个按钮,可能存在重复点击的情况,我们可以利用RxBinding来防止重复点击做重复网络请求.
代码举例:
//TODO 5s内点击按钮只有第1次生效弹出Toast,超过5s后点击按钮才会第二次弹出Toast
RxView.clicks(findViewById(R.id.tv_fangdou))
.throttleFirst(5, TimeUnit.SECONDS)//表示5s内只有第一次点击生效
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Toast.makeText(UseActivity.this, "5s内只有第1次点击生效了", Toast.LENGTH_SHORT).show();
}
});
网络嵌套
先请求主数据,然后在根据主数据中的某个字段去请求子数据.
比如:我们先获取到某个用户的朋友列表,然后根据某个朋友的ID(如:张三)去查询朋友的信息.
解决方案:
可以采用flatMap
这种方式去做处理,可以实现多个嵌套的网络请求在同一层级上面展示,不会像多层嵌套那样不易阅读.
doOnNext运用
频繁的在主线程与子线程之间切换来完成我们的业务.
举例:
银行项目存在频繁在主线程与子线程之间切换,可以采用doOnNext
这种方式来解决.
2.RxJava模式与原理
标准观察者与RxJava观察者
标准观察者:
一个被观察者(Observable
),可以有多个观察者(Observer
),被观察者发生改变,所有订阅了他的观察者都能收到这个变化消息.
举例:
移动公司给所有用户发送一条短信,移动公司就作为被观察者,而所有的用户就作为观察者.
RxJava观察者流程:
- 创建Observable
- 创建Observer
- 使用subscribe()订阅
分析RxJava观察者流程时,不按照上面的步骤来:
- 查看Observer源码
- 定义了Observer接口的方法,比如:onSubscribe、onNext、onError、onComplete
- 然后在使用的时候,直接创建自定义观察者,将new新建的Observer传入作为参数,重写实现方法.
- 了解Observable创建过程,分析源码
- 调用create()方法会创建ObservableCreate对象
- 将自定义(ObservableOnSubscribe)source资源传入ObservableCreate对象,作为一个参数
- 了解subscribe订阅过程,分析源码
- 在订阅的过程中,首先执行观察者中onSubscribe方法,然后执行onNext/onError,最后执行onComplete方法
- subscribe方法传入的参数是观察者Observer,调用者是被观察者Observable,有一个中间层发射器ObservableEmitter
- 在执行subscribe方法时,最终会调用到Observable的实现类ObservableCreate的subscribeActual方法
标准观察者设计模式和RxJava观察者设计模式比较:
- 在标准观察者设计模式中,是一个被观察者,对应多个观察者,并且被观察者发出改变通知后,所有的观察者才能观察到;耦合度高.
- 在RxJava观察者设计模式中,是多个被观察者,一个观察者,并且需要起点和终点在订阅一次后,才发出改变通知,终点观察者才能观察到;耦合度低,也叫发布/订阅模式,也可以叫作观察者模式.
扩展知识:
RxJavaPlugins.setOnObservableAssembly()
可以实现Hook
,全局监听整个项目RxJava
执行了哪些Observable
;RxJavaPlugins
就是一个用来做全局监听的工具类,里面包含了多种功能.
map变换操作符原理
map是用来做类型转换的,比如:将String类型转换成Integer类型,也可以将一个对象映射成另外一个对象.
代码举例:
.map(new Function<String, Integer>() {//通过map中传入Function,将String转换成Integer类型
@Override
public Integer apply(String s) throws Exception {
//返回Integer类型
return 9527;
}
})
洋葱模型:
观察者
(终点
):
new Observer
作为参数传入订阅方法subscribe
订阅(subscribe(observer))
:- 这个方法中会调用
subscribeActual(observer)
方法,由于加入了map
拦截,所以由map
方法返回的ObservableMap
对象来调用subscribeActual
方法. ObservableMap.subscribeActual(observer)
方法中做了哪些事情:public void subscribeActual(Observer<? super U> t) { //MapObserver作为Observer的包装(封装/包裹),该类持有了Observer成员变量actual //通过MapObserver<T, U>对类型进行转换,将T类型转换成U类型 //这里的source是上一层传递过来的对象,而MapObserver是封装的是下一层的包裹(Observer)t source.subscribe(new MapObserver<T, U>(t, function)); }
- 第一次包装,采用
MapObserver
进行包装,这里的参数t
就是观察者Observer
.
- 这个方法中会调用
map
:
作用:卡片拦截,在被观察者与观察者之间添加拦截,可以进行类型转换.
流程分析:- 该方法返回包装类
ObservableMap<T, U>
,这个类可以将T
类型转换成U
类型并返回; - 最终体现在
map
方法参数Function
类的apply
方法中,将转换后的U
类型返回.
- 该方法返回包装类
map(多重拦截)
:- 对上一次包裹
Observer
进行再次包装,采用的MapObserver
进行包装.这里的参数t
就是上一次包装生成的观察者Observer
. - 最终由
Observable
的实现类ObservableCreate
来调用该方法; ObservableCreate.subscribeActual(observer)
方法中做了哪些事情:protected void subscribeActual(Observer<? super T> observer) { //1.包装观察者,将观察者作为参数传入创建的发射器对象Emitter //由于我们做过拦截,所以这里传入的是包装后的Observer CreateEmitter<T> parent = new CreateEmitter<T>(observer); //2.调用onSubscribe方法,所以这个方法早于我们的执行流程 observer.onSubscribe(parent); try { //3.自定义source开始订阅,并将发射器作为参数传入; //这个方法就会执行到我们自定义ObservableOnSubscribe的subscribe方法,这里就会去拆包裹 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); //如果报错走最外层包裹的onError方法 parent.onError(ex); } }
- 对上一次包裹
create
:
创建ObservableCreate
对象并返回,并将自定义ObservableOnSubscribe
作为source
参数传入.- 最后一次包装,采用
CreateEmitter
进行包装,代码如下:
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- 最后一次包装,采用
自定义source
:
在自定义ObservableOnSubscribe
的subscribe
方法中,可以去执行onNext
方法.通过查看源码流程走向,调用该方法后,就会依次调用每一个Observable
实现类中内部包装类的onNext
方法,最终调用到我们通过new
创建的Observer
中的onNext
方法.
总结:
- 首先,RxJava的执行流程是从上往下的,依次创建Observable的实现类,最终调用订阅subscribe方法;
- 其次,调用完订阅subscribe方法后,就开始从下往上依次对观察者Observer封装包裹.
说明:(source.subscribe(包装类(observer)),是封装包裹发起者
)
map方法的包装类是MapObserver,返回的实现类是ObservableMap对象;
create方法的包装类是CreateEmitter,返回的实现类是ObservableCreate对象;- 最后,我们在自定义source(
ObservableOnSubscribe
)的回调方法subscribe方法中,执行包装类的onNext或onComplete方法时,就会从上往下,依次从外向内开始拆包裹.
说明:(包装类.onNext和onSubscribe是拆包裹发起者
)
依次执行当前包裹中封装的Observer的onNext或onComplete方法,最终执行到我们自定义Observer的onNext或onComplete方法,至此完成整个流程.
RxJava中map流程图如下:
背压
消费的速度跟不上生产的速度时,就存在背压的问题,我们可以采用Flowable
替换Observable
来解决背压的问题.
3.RxJava原理与自定义操作符
线程切换原理
subscribeOn()
给上面的代码分配线程
Schedulers.io()
最终会通过线程池来进行管理,因此后面执行的任务都是在子线程中进行.
执行步骤:(Schedulers.io() == IoScheduler(持有线程池)
)- Schedulers.io()->(Scheduler)Schedulers.IO->new IOTask()->IOTask.run()->IoHolder.DEFAULT->new
IoScheduler()
->IoScheduler.start()->new CachedWorkerPool ->CachedWorkerPool
类持有线程池变量:ScheduledExecutorService evictorService
//构造函数中将线程池变量evictor赋值给成员变量evictorService CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { //省略无关代码 ScheduledExecutorService evictor = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); } //创建线程池并赋值给evictorService成员变量 evictorService = evictor; }
- Schedulers.io()->(Scheduler)Schedulers.IO->new IOTask()->IOTask.run()->IoHolder.DEFAULT->new
observeOn()
给下面的代码分配线程
AndroidSchedulers.mainThread()
最终是通过Handler来完成子线程到主线程的切换,因此后面的代码可以更新UI.
执行步骤:(AndroidSchedulers.mainThread() == HandlerScheduler(handler)
)- AndroidSchedulers.mainThread()->(Scheduler)Schedulers.MAIN_THREAD->MainHolder.DEFAULT->new
HandlerScheduler(new Handler(Looper.getMainLooper()))
; - 这里传递了主线程的Looper对象给Handler,以确保代码执行在主线程中.
- AndroidSchedulers.mainThread()->(Scheduler)Schedulers.MAIN_THREAD->MainHolder.DEFAULT->new
RxJava中onserveOn(AndroidSchedulers.mainThread())流程图:
扩展知识
观察者Observer
的回调方法中会返回一个Disposable
对象,我们在页面销毁的时候,需要判断这个对象Disposable
是否销毁dispose
了,如果没有销毁需要将其销毁.
这样的目的是为了防止内存泄漏,解决在页面销毁的时候,还在执行后面onNext
和onComplete
的逻辑操作的问题.
代码如下:
//使用结果赋值给一个成员变量,在生命周期结束时销毁他
private Disposable mDisposable;
private void doSomething() {
disposable = Observable.create((ObservableOnSubscribe<String>) e -> {
e.onNext("第一步");
e.onNext("第二步");
e.onComplete();
}).subscribe(s -> {
});
}
@Override
protected void onDestroy() {
super.onDestroy();
//结束生命周期销毁disposable
if (mDisposable != null && !mDisposable.isDisposed()){
mDisposable.dispose();
}
}
自定义RxView操作符
主要是通过自定义Observable
继承自Observable
,重写subscribeActual(observer)
方法,然后在该方法中通过source.subscribe(包装类)
,将我们封装了下一层Observer
的包装类
传递进来封装包裹;包装类需要实现Disposable
,达到可以被中断的目的,同时需要包含下一层包裹Observer
变量,以便一层层调用每一层包裹Observer
的方法.
总结:
整体实现流程
- 通过由上往下一层一层调用Observable的各种方法,创建出Observable的具体实现类.
比如:通过调用create()方法,会返回ObservableCreate实现类,通过map()方法会返回ObservableMap实现类,通过observeOn()会返回OnservableObserveOn实现类;总之,就是在Observable后面拼接方法名称构成一个对象.- 通过由下往上调用Observable.subscribe(observer)方法封装包裹,接着调用具体实现类中的subscribeActual(observer)方法来完成,最终将封装的包裹通过这种方式传入:
source.subscribe(new 包装类(observer);
参数说明:
source:
表示create方法中通过new传入的ObservableOnSubscribe对象
observer:
表示下一层封装的包裹,每一个包裹都是一个包装类,包装类都实现了Observer接口
subscribe():
方法表示ObservableOnSubscribe重写的subscribe(包装类
)方法,里面的参数对应每一层的包装类;一般包装类调用onNext方法时,就会调用到包装类中的Observer对象onNext方法,依次达到一层一层往下拆包裹的目的.- 通过由上往下依次调用每个
具体实现类
.包装类
中的onNext方法时,就会直接调用调用observer.onNext方法,这里observer对象就是下一层包裹,因为每一层包裹都实现了Observer接口,以此达到了一层一层往下调用每一层包裹中onNext方法的目的.