博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
关于RxJava在业务上的一些思考
阅读量:7003 次
发布时间:2019-06-27

本文共 8165 字,大约阅读时间需要 27 分钟。

最近在工作中,频繁的使用了Rxjava来解决一些问题,在使用过程中也给予了自己一些思考,如何使用好RxJava,在什么样的场景中才能发挥它更好的作用,如何脱离代码来理解RxJava的工作机制,下面是自己一些浅显的思考。

示例

太多示例喜欢链式的把RxJava的流程表述起来,这个地方我把观察者和订阅者拆开来看。

Observable
observable = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
emitter) throws Exception { emitter.onNext("123"); } }); Observer observer = new Observer
() { ... @Override public void onNext(String s) { Log.i("TAG", "onNext: " + s); } };observable.subscribe(observer);复制代码

这个简单的例子大家应该都知道,只要subcribe产生了订阅,onNext方法将会收到 emitter.onNext("123"); 发射出去的数据。

这个地方让我产生思考主要是有一次去吃自助餐,大家在打酸奶的时候,都会拿着一个杯子对准出口,然后按住开关,酸奶就会自动流到杯子中。在这个过程中,我们不妨把酸奶机看做 Observable ,酸奶机里面的酸奶是许许多多的 emitter.onNext("123") ,按住开关的那一刻产生了 subscribe 订阅,然后我们是用杯子 Observer 去接牛奶的,当然,我们还有橙子机、酸梅汤机等,则机子内盛的饮料类型就是 Observable<String> 。我们知道,酸奶机有很多个开关入口,这时候,又来一个人,拿着杯子Observer来打牛奶,那么,我和他是一块共享这酸奶机里面的酸奶,我们俩都能接收到酸奶,等我们不需要接酸奶了,我们就dispose关闭开关。

eg:

Observable
observable = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
observableEmitter) throws Exception { //模拟耗时任务 for (int i = 0; i < 5; i++) { observableEmitter.onNext(""+i); } } }).subscribeOn(Schedulers.io());//杯子1observable.subscribe(observer1);//杯子2observable.subscribe(observer2);复制代码

result:

observer1 onNext: 0observer2 onNext: 0observer1 onNext: 1observer2 onNext: 1observer1 onNext: 2observer2 onNext: 2observer1 onNext: 3observer2 onNext: 3observer1 onNext: 4observer2 onNext: 4复制代码

事件驱动的思考

之前在思考事件驱动这一块,如何更好的通知其他业务组件,业界比较有名的当属EventBus,但EventBus用起来很杂乱无章,当项目规模大起来,业务复杂起来时,都不敢修改这个post,虽然解耦了,但事件变得更乱了,所以,自己重新思考了事件驱动这一块。

鉴于EventBus提供的的思路,我打算用RxJava的方式来实现。以酸奶机为例,当前页面我想订阅一个事件,等待被触发,我完全可以先准备一个杯子(Observer),然后将他们存到一个集合里面,待酸奶机(Observable)里面有酸奶了(observableEmitter.onNext),然后订阅(subcribe)这个杯子的集合,将酸奶倒到杯子里,鉴于此思路,用代码大致的实现下。

List
list = new ArrayList<>(); //注册事件 public void registerObserver(Observer observer) { list.add(observer); } //驱动事件 public void postEvent() { Observable
observable = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
emitter) throws Exception { emitter.onNext("123"); } }); for (int i = 0; i < list.size(); i++) { observable.subscribeOn(Schedulers.io()).subscribe(list.get(i)); } } @Test public void Test() { Observer observer = new Observer
() { ... @Override public void onNext(String s) { System.out.println("onNext: " + s); } }; //注册事件 registerObserver(observer); //发送事件 postEvent(); }复制代码

这里只给了大致的思路,用了一个平时都可见的例子来实现了事件驱动。

异步回调的思考

最近有一个业务场景,需要监听RTK当前状态的变化,业务场景是:

有一个前提,RTK必须先设置账户,才能使用后续的服务。如果用户按照应用准则走,进入主页后,先去设置页面设置RTK,然后回到主页,进入任务执行页,这时候监听RTK state是可用的;但如果用户忘了设置的步骤,或是有强迫症的用户,我就不往你提示的方式走,我就要先进任务执行员页,这时候RTK State监听是不可用的,我们会引导用户进入设置页面设置RTK,这个地方又要分情况,如果用户设置好RTK账户,然后返回了任务页,那么任务页的RTK state监听到了用户设置了账户就会返回可用,那么这次任务是可使用的;但如果用户设置好了账户,想去诊断RTK当前连接的状态的话,RTK state监听事件就会被诊断RTK页面给设置,也就意味着,任务页的RTK state监听就会失效,那么返回任务页的话,任务页是不会有任何反应的。但这一块也是有解决办法的,就是任务页的RTK state监听放在 onResume 方法里面,即设置页面返回任务页后,触发任务页的 onResume 方法,重新夺回RTK state的监听。 办法都是有的,但依赖生命周期去做到这点,感觉并不是特别的可靠,我们可以参考上面事件驱动的例子,将RTK state看成是酸奶机,然后哪个页面(杯子)需要RTK state信息的话,就可以订阅(subcribe)酸奶机,如果想要牛奶的话,就post一个信息出去,告诉酸奶机我要酸奶,下面,我给出一份示例:

Set
set = new HashSet<>(); //模拟一个RTK state 单例 public Observable getObservableInstance() { return Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
emitter) throws Exception { RTK rtk = DjiSettingUtils.getRTK(); rtk.setStateCallback(new RTKState.Callback() { @Override public void onUpdate(@NonNull RTKState rtkState) { emitter.onNext(rtkState); } }); } }); } //驱动事件 public void postEvent(Observer observer) { if (!set.contains(observer)) { getObservableInstance().subscribeOn(Schedulers.io()).subscribe(observer); } } @Test public void onCreate() { Observer observer = new Observer
() {  ... @Override public void onNext(RTKState s) { System.out.println("onNext: " + s.isRTKBeingUsed()); } }; //发送事件 postEvent(observer); }复制代码

之后,我们只需要关注 onCreate 方法,在任务页我们发起一个订阅事件,接收RTK state信息,在诊断页面也发起一个订阅,接收RTK信息,这样就不会像上面那样,抢断监听事件的问题。

多图上传的思考

业务场景中有需要从无人机中读取缩略图,并将缩略图上传至服务器,图片上传我们使用的是七牛云,因为一次任务产生的缩略图非常多,基本上都在百张左右,我们不可能为了在上传过程中,因为某些原因导致了断开了,让用户重新上传所有的缩略图,所以,我们打算让百张缩略图采用顺序上传,当哪个节点发生错误的时候,记住index,等用户点击重新上传时,我们再从index的位置继续上传,如果按照传统方式来做的话,第一张上传成功后,如何通知第二张上传呢,我这里给个大致的代码:

List
list=new ArrayList<>();int index=0;public void uploadPic(){ uploadManager.put(list.get(index), key, token, new UpCompletionHandler() { @Override public void complete(String key, ResponseInfo info, JSONObject res) { if (info.isOK()) {    index++; uploadPic() } else { //弹框提示用户,当前index上传失败 } } }, null);} @Testpublic void test(){ uploadPic()}复制代码

每次上传成功后都调用自身的方法,如果上传失败了,则记住index的位置,提示用户,用户点击重试上传,那么就继续调用 uploadPic 方法,上传的拿到的文件还是从index位置开始拿,所以,也是没有任何问题的。 但是,总觉得这么设计不那么优雅,比如我想知道上传进度的话,那也就意味着我需要在index++方法下面加一个设置进度条的功能,那如果业务需要再加一个上传完成的操作的话,那是不是又要在index++下面多加一个 index==list.size() 的判断呢,其实,这样设计下去的话,整个上传功能就变得特别的松散,移植性也不强,所以,是时候发挥RxJava的 Observer 了。

鉴于异步回调的思考,我打算把上传任务封装成一个 ObservableOnSubcribe ,每次执行任务成功后,就将事件流onNext交给下游,告诉他我完成了一次上传,如果上传失败了,则发射onError异常。

public class QiNiuBitmapOnSubscribe implements ObservableOnSubscribe
{   ...    @Override public void subscribe(final ObservableEmitter
emitter) throws Exception { //上传操作 uploadManager.put(file, key, token, new UpCompletionHandler() { @Override public void complete(String key, ResponseInfo info, JSONObject res) { if (info.isOK()) { emitter.onNext(new QiniuParam(key, info, res)); emitter.onComplete(); } else { emitter.onError(new ServerException(-1, res.toString())); } } }, null); }}复制代码

由于图片是存储在一个集合中,那么就肯定要用到RxJava的 fromIterable 来遍历集合,由于需要保证图片是有序上传,就需要用到 concatMap 操作符 , 所以,大致代码如下

Observable.fromIterable(fileList)                .concatMap(new Function
>() { @Override public ObservableSource
apply(QiNiuFile qiniuFile) throws Exception { //返回七牛云上传 return Observable.create(new QiNiuFileOnSubscribe(uploadManager, qiniuFile.getFile(), qiniuFile.getKey(), qiniuFile.getUploadToken())); } }).subscribe(new Observer
() { ... @Override public void onNext(QiniuParam qiniuParam) { index++; //通知上传进度 uploadCallBack.onUploadProcess(index); } @Override public void onError(Throwable e) { //通知断传的位置 uploadCallBack.onUploadQiNiuError(index); } @Override public void onComplete() { //上传成功 uploadCallBack.onUploadQiNiuComplete(); } });复制代码

对于 Observer 来说,他是一个干净的接收流,他不关心上游发生的事情,只专注结果的处理。

思考

以上思考有的地方可能不是特别的完善,还需要多思考,RxJava用的人确实很多,但要想玩的溜的话,确实任重而道远。

转载地址:http://ajytl.baihongyu.com/

你可能感兴趣的文章
Python 类的一些BIF
查看>>
LCT
查看>>
VIJOS-P1635 城市连接
查看>>
chown命令详情
查看>>
强数学归纳法
查看>>
第三次作业结对编程
查看>>
jQuery总结(摘抄)
查看>>
_stat函数/struct stat 结构体使用笔记
查看>>
二分搜索 HDOJ 2289 Cup
查看>>
Byte[]、Image、Bitmap 之间的相互转换
查看>>
分布式全文检索引擎之ElasticSearch
查看>>
数组名和指针区别
查看>>
实现子数组和绝对值差最小 - Objective-C
查看>>
明天支付宝就开始提现收费了!这几招可以让你受用
查看>>
洛谷P4774 屠龙勇士
查看>>
第一次作业(个人作业)
查看>>
Leetcode | Recover Binary Search Tree
查看>>
springmvc学习第二天
查看>>
Linux IO接口 监控 (iostat)
查看>>
Hadoop阅读笔记(三)——深入MapReduce排序和单表连接
查看>>