RxJava—takeUntil实践

泡在网上的日子 / 文 发表于2016-08-11 03:16 次阅读 RxJava

原文:RxJava — Practical takeUntil Example。 

上周我遇到了这样的问题:

-我需要发送多个同一类型的model到后端
- 后端没有一次发送所有model的API
- 一旦后端返回成功的响应则停止发送model对象

因为model对象的数据源已经是响应式的并且我已经有了一个响应式的向后端发送model的方法,我决定继续响应式的方式。

我想什么操作符在这里最合理呢,于是想起了 takeUntil

自文档:

Returns an Observable that emits items emitted by the source Observable, checks the specified predicate
for each item, and then completes if the condition is satisfied.

听起来不错的样子,那么具体该如何建立呢?

首先让我们得到需要发送的model。

modelProvider.getItems()

这是我们的源Observable。下一件事就是为每个对象发起后端请求。这里 FlatMap就派上用场了。

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)

我这里使用 Retrofit发送后端请求,它将返回一个发出匹配响应的Observable。

假设我们的 Retrofit Observable发射的model有一个判断是否成功的函数。我们要做的就是检查它。如果是成功则应该停止向后端发送更多的请求。如果不是则继续。那么就让我们把takeUntil操作符用起来。

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)
    .takeUntil(response -> response.isSuccessful())

现在一旦响应是成功它将自动终止源Observable于是停止向后端发送数据。如果不是则继续发送更多请求。

可能有人认为我们已经完成了但是实际上还有两种情况没有处理:

- 如果没有item从源Observable发出?
- 如果后端根本不会返回成功的响应?

两种情况下我们都会在Subscriber的onComplete中结束,这也许不是我们想要的。我们可以使用 lastOrDefault操作符来修复。

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)
    .takeUntil(response -> response.isSuccessful())
    .lastOrDefault(ServerResponse.createUnsuccessful())

我们想要得到Observable发出的最后一个然后flatMapp到Retrofit Observable。如果源Observable并没有发出我们将自己创建一个无效的响应。

既然我们只发出一个值,我们就能把它转换成Single。

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)
    .takeUntil(response -> response.isSuccessful())
    .lastOrDefault(ServerResponse.createUnsuccessful())
    .toSingle()

现在我们就可以订阅和处理被发出的响应了和错误了。

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)
    .takeUntil(response -> response.isSuccessful())
    .lastOrDefault(ServerResponse.createUnsuccessful())
    .toSingle()
    .subscribe(response -> {
        if (response.isSuccessful()) {
            // We made it.
        } else {
            // Not successful.
        }
    }, throwable -> {
        // Some error happened along the way.
    })

就是这样。以reactive的方式,这个问题只用了很少的代码就被解决了。期待回复。

注意:为了保持简洁,我省略了整个Scheduling。通常你需要使用Scheduler把后端请求放在后台一旦订阅再切换到UI。

更新:就如Ivan Škorić所指出的,我们可以让代码更短,不使用takeUntil,而使用firstOrDefault:

modelProvider.getItems()
    .flatMap(retroApiInterface::doBackendRequest)
    .firstOrDefault(ServerResponse.createUnsuccessful(), response -> response.isSuccessful())
    .toSingle()


收藏 赞 (6) 踩 (0)
上一篇:Styling Colors & Drawables w/ Theme Attributes
当你写入类似下面代码的时候: context.getResources().getColor(R.color.some_color_resource_id); 你很可能注意到Android Studio会给你“Resources#getColor(int)方法在Marshmallow已经过时,请使用新的Resources#getColor(int, Theme)方法”的警告。也许
下一篇:RxJava:操作符和Subject的线程安全
原文: RxJava: thread safety of the Operators and Subjects 绝大多数RxJava Operators 和 Subjects 都不是线程安全的。 RxJava 很棒,除了并发。这个话题我很早就想讨论了。 Observable 协议 Observables must issue notifications to observers serially