博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava系列番外篇:一个RxJava解决复杂业务逻辑的案例
阅读量:7069 次
发布时间:2019-06-28

本文共 7029 字,大约阅读时间需要 23 分钟。

之前写过一系列RxJava1的文章,也承诺过会尽快有RxJava2的介绍。无奈实际项目中还未真正的使用RxJava2,不敢妄动笔墨。所以这次还是给大家分享一个使用RxJava1解决问题的案例,希望对大家在使用RxJava的时候有一点点启发。对RxJava还不了解的同学可以先去看看我之前的RxJava系列文章:

业务场景

拿这个开源的天气App来举例:

进入App首页后,首先我们需要从数据库中获取当前城市的天气数据,如果数据库中存在天气数据则在UI页面上展示天气数据;如果数据库中未存储当前城市的天气数据,或者已存储的天气数据的发布时间相比现在已经超过了一小时,并且网络属于连接状态则调用API从服务端获取天气数据。如果获取到到的天气数据发布时间和当前数据库中的天气数据发布时间一致则丢弃掉从服务端获取到的天气数据,如果不一致则更新数据库并且在页面上展示最新的天气信息。(同时天气数据源是可配置的,可选择是小米天气数据源还是Know天气数据源)

解决方案

首先我们需要创建一个从数据库获取天气数据的Observable observableForGetWeatherFromDB,同时我们也需要创建一个从API获取天气数据的Observable observableForGetWeatherFromNetWork;为了在无网络状态下免于创建observableForGetWeatherFromNetWork我们在这之前需要首先判断下网络状态。最后使用contact操作符将两个Observable合并,同时使用distincttakeUntil操作符来过滤筛选数据以符合业务需求,然后结合subscribeOnobserveOn做线程切换。上述这一套复杂的业务逻辑如果使用传统编码方式将是极其复杂的。下面我们来看看使用RxJava如何清晰简洁的来实现这个复杂的业务:

Observable
observableForGetWeatherData;//首先创建一个从数据库获取天气数据的ObservableObservable
observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { try { Weather weather = weatherDao.queryWeather(cityId); subscriber.onNext(weather); subscriber.onCompleted(); } catch (SQLException e) { throw Exceptions.propagate(e); } }});if (!NetworkUtils.isNetworkConnected(context)) { observableForGetWeatherData = observableForGetWeatherFromDB;} else { //接着创建一个从网络获取天气数据的Observable Observable
observableForGetWeatherFromNetWork = null; switch (configuration.getDataSourceType()) { case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW: observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId) .map(new Func1
() { @Override public Weather call(KnowWeather knowWeather) { return new KnowWeatherAdapter(knowWeather).getWeather(); } }); break; case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI: observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId) .map(new Func1
() { @Override public Weather call(MiWeather miWeather) { return new MiWeatherAdapter(miWeather).getWeather(); } }); break; } assert observableForGetWeatherFromNetWork != null; observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork .doOnNext(new Action1
() { @Override public void call(Weather weather) { Schedulers.io().createWorker().schedule(() -> { try { weatherDao.insertOrUpdateWeather(weather); } catch (SQLException e) { throw Exceptions.propagate(e); } }); } }); //使用concat操作符将两个Observable合并 observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork) .filter(new Func1
() { @Override public Boolean call(Weather weather) { return weather != null && !TextUtils.isEmpty(weather.getCityId()); } }) .distinct(new Func1
() { @Override public Long call(Weather weather) { return weather.getRealTime().getTime();//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉 } }) .takeUntil(new Func1
() { @Override public Boolean call(Weather weather) { return System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000;//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流 } });}observableForGetWeatherData.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1
() { @Override public void call(Weather weather) { displayWeatherInformation(); } }, new Action1
() { @Override public void call(Throwable throwable) { Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show(); } });

上面的代码看起来比较复杂,我们采用Lambda表达式简化下代码:

Observable
observableForGetWeatherData;//首先创建一个从数据库获取天气数据的ObservableObservable
observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { try { Weather weather = weatherDao.queryWeather(cityId); subscriber.onNext(weather); subscriber.onCompleted(); } catch (SQLException e) { throw Exceptions.propagate(e); } }});if (!NetworkUtils.isNetworkConnected(context)) { observableForGetWeatherData = observableForGetWeatherFromDB;} else { //接着创建一个从网络获取天气数据的Observable Observable
observableForGetWeatherFromNetWork = null; switch (configuration.getDataSourceType()) { case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW: observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId) .map(knowWeather -> new KnowWeatherAdapter(knowWeather).getWeather()); break; case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI: observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId) .map(miWeather -> new MiWeatherAdapter(miWeather).getWeather()); break; } assert observableForGetWeatherFromNetWork != null; observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork .doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> { try { weatherDao.insertOrUpdateWeather(weather); } catch (SQLException e) { throw Exceptions.propagate(e); } })); //使用concat操作符将两个Observable合并 observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork) .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId())) .distinct(weather -> weather.getRealTime().getTime())//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉 .takeUntil(weather -> System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000);//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流}observableForGetWeatherData.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(weather -> displayWeatherInformation(), throwable -> Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show());

小技巧

在上述的实现中有几点是我们需要注意的:

  1. 为什么我需要在判断网络那块整个if else?这样看起来很不优雅,我们通过RxJava符完全可以实现同样的操作啊!之所以这样做是为了在无网络状况下去创建不必要的Observable observableForGetWeatherFromNetWork;

  2. 更新数据库的操作不应该阻塞更新UI,因此我们在observableForGetWeatherFromNetWorkdoOnNext中需要通过Schedulers.io().createWorker()去另起一条线程,以此保证更新数据库不会阻塞更新UI的操作。

有同学可能会问为什么不在doOnNext之后再调用一次observeOn把更新数据库的操作切换到一条新的子线程去操作呢?其实一开始我也是这样做的,后来想想不对。整个Observable的事件传递处理就像是在一条流水线上完成的,虽然我们可以通过observeOn来指定子线程去处理更新数据库的操作,但是只有等这条子线程完成了更新数据库的任务后事件才会继续往后传递,这样就阻塞了更新UI的操作。对此有疑问的同学可以去看看我之前关于RxJava源码分析的文章或者自己动手debug看看。

问题

最后给大家留个两个问题:

  1. 上述代码是最佳实现方案吗?还有什么更加合理的做法?

  2. 我们在observableForGetWeatherData中使用distincttakeUntil过滤筛选天气数据的时候网络请求会不会已经发出去了?这样做还有意义吗?

欢迎大家留言讨论。

本文中的代码在中的WeatherDataRepository类中有同样的实现,文章中为了更完整的将整个实现过程呈现出来,对代码做了部分改动。

如果大家喜欢这一系列的文章,欢迎关注我的知乎专栏、Github以及简书。

  • 知乎专栏:

  • GitHub:

  • 简书:

转载地址:http://uuell.baihongyu.com/

你可能感兴趣的文章
C语言 线性表 顺序表结构 实现
查看>>
SQLLoader7(只导入数据文件的其中几行记录)
查看>>
Vim 中使用cscope
查看>>
HR系统+人脸识别
查看>>
RabbitMQ与AMQP协议详解
查看>>
metronic后台模板学习 -- 所用外部插件列表
查看>>
微软原版SQLHelper类
查看>>
首页设计的可用性和PET
查看>>
mongodb的分布式集群(1、主从复制)
查看>>
http://www.cnblogs.com/yaozhenfa/archive/2015/06/14/4574898.html
查看>>
动态为DropDownList添加Item
查看>>
spring NotWritablePropertyException异常
查看>>
iOS开发证书"此证书的签发者无效"解决方法
查看>>
Python实现的通用树结构,支持节点索引,常数时间查找
查看>>
网络传输协议
查看>>
iOS Principle:Category
查看>>
Java多线程之synchronized增强版——ReentrantLock
查看>>
MVP设计模式应该这样掌握
查看>>
Git标签的管理和配置命令别名
查看>>
对UIView,UIWindow和CALayer的理解
查看>>