博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[译] RxJS 调度器入门
阅读量:7039 次
发布时间:2019-06-28

本文共 2554 字,大约阅读时间需要 8 分钟。

原文链接:

本文为 翻译文章,如需转载,请注明出处,谢谢合作!
如果你也想和我们一起,翻译更多优质的 RxJS 文章以奉献给大家,请点击

RxJS 中的调度器 ( Schedulers ) 是用来控制事件发出的顺序和速度的(发送给观察者的)。它还可以控制订阅 ( Subscriptions ) 的顺序。为了不搞得太理论化,先考虑下这个示例:

const a$ = Rx.Observable.of(1, 2);const b$ = Rx.Observable.of(10);const c$ = Rx.Observable.combineLatest(a$, b$, (a, b) => a + b);c$.subscribe(c => console.log(c));复制代码

你觉得控制台输出的结果是什么呢?大多数人会猜是:

1112复制代码

因为首先 a$1会和 b$ 中的10配对,然后 a$ 中的2b$ 中的10配对。

事实上,出现在控制台中的是:

12复制代码

1 + 10 的组合并没有发生。原因是 Observables a$b$ 都是“同步的”,它们会尽可能快地执行。那么事件发出的顺序到底是怎样的呢?答案是不确定的,它可能是以下任意一种:

  • 1, 2, 10
  • 1, 10, 2
  • 10, 1, 2

在这种顺序不确定的情况下,我们应该描述出事件的发出顺序是怎样的。这就是调度器所做的事。默认情况下,RxJS 使用所谓的递归调度器。下面是它的工作原理:

  1. c$ 被订阅
  2. combineLatest 的第一个输入流 a$ 被订阅
  3. a$ 发出值 1
  4. combineLatest 将 1 作为 a$ 的最新值进行保存
  5. a$ 发出值 2
  6. combineLatest 将 2 作为 a$ 的最新值进行保存
  7. combineLatest 的第二个输入流 b$ 被订阅
  8. b$ 发出值 10
  9. combineLatest 将 10 作为 b$ 的最新值进行保存
  10. combineLatest 现在同时拥有了 a$ 和 b$ 的值,因此它发出值 2 + 10

发出的顺序为 1, 2, 10 。最有意思的是在 b$ 被订阅前, 将 a$ 的所有事件都尽快地发出了。RxJS 使用这种调度器作为默认调度器出于两点原因:

  • 使用此策略性能的整体表现更好
  • 在调试工具中更易于进行堆栈跟踪

然而,可以通过使用不同的调度器来自定义事件发出的顺序及速度。我们在 a$ 上使用 asap 调度器来让其“慢下来”:

// const a$ = Rx.Observable.of(1, 2);const a$ = Rx.Observable.from([1, 2], Rx.Scheduler.asap); // 新代码const b$ = Rx.Observable.of(10);const c$ = Rx.Observable.combineLatest(a$, b$, (a, b) => a + b);c$.subscribe(c => console.log(c))复制代码

from 操作符的第二个参数是调度器,用来自定义事件的发出。asap 调度器使用 来安排任务尽快运行,但不是同步的。代码改变后,控制台会输出:

1112复制代码

因为内部运行顺序如下:

  1. c$ 被订阅
  2. combineLatest 的第一个输入流 a$ 被订阅
  3. combineLatest 的第二个输入流 b$ 被订阅
  4. b$ 发出值 10
  5. combineLatest 将 10 作为 b$ 的最新值进行保存
  6. a$ 发出值 1
  7. combineLatest 将 1 作为 a$ 的最新值进行保存
  8. combineLatest 现在同时拥有了 a$ 和 b$ 的值,因此它发出值 1 + 10
  9. a$ 发出值 2
  10. combineLatest 将 2 作为 a$ 的最新值进行保存
  11. combineLatest 发出值 2 + 10

发出的顺序为 10, 1, 2 。为了得到另外一种发出顺序,可以为 b$ 也自定义调度器:

const a$ = Rx.Observable.from([1, 2], Rx.Scheduler.asap);// const b$ = Rx.Observable.of(10);const b$ = Rx.Observable.from([10], Rx.Scheduler.asap); // 新代码const c$ = Rx.Observable.combineLatest(a$, b$, (a, b) => a + b);c$.subscribe(c => console.log(c));复制代码

现在发出的顺序为 1, 10, 2,因为运行顺序如下:

  1. c$ 被订阅
  2. combineLatest 的第一个输入流 a$ 被订阅
  3. combineLatest 的第二个输入流 b$ 被订阅
  4. a$ 发出值 1
  5. combineLatest 将 1 作为 a$ 的最新值进行保存
  6. b$ 发出值 10
  7. combineLatest 将 10 作为 b$ 的最新值进行保存
  8. combineLatest 现在同时拥有了 a$ 和 b$ 的值,因此它发出值 1 + 10
  9. a$ 发出值 2
  10. combineLatest 将 2 作为 a$ 的最新值进行保存
  11. combineLatest 发出值 2 + 10

调度器还可以让事件的发出变得更快,同时保持发出的顺序不变。例如,RxJS 的 TestScheduler 可以使 Observable.interval(1000).take(10) 被订阅时进行同步执行,而不需要花费10秒钟来完成:

Rx.Observable.interval(1000, new Rx.TestScheduler()).take(10)复制代码

TestScheduler 是在 RxJS 内部使用的 (),它使得成百上千个时间相关的测试代码飞快地运行,但有一些像 这样的工具和积极的讨论来丰富此调度器的使用场景,使得在 RxJS 内部之外的地方也可以使用。

如果你也喜欢本文,可以考虑将此分享给你的关注者。

你可能感兴趣的文章
仿饿了么项目-vue的学习笔记总目录
查看>>
Angular 2.x+ 如何动态装载组件
查看>>
React中的setTimeout、setInterval的注意事项
查看>>
如何深入使用scss开发一个简单页面
查看>>
JS学习系列 03 - 函数作用域和块作用域
查看>>
外卖订单爬虫(美团,饿了么,百度外卖)
查看>>
用Flink取代Spark Streaming,知乎实时数仓架构演进
查看>>
2019年值得关注的八大DevOps趋势
查看>>
教育部下令中小学推广编程教育,全民AI真的要来了
查看>>
C#未来新特性:静态委托和函数指针
查看>>
从Python2到Python3:超百万行代码迁移实践
查看>>
如何避免移动测试自动化失败
查看>>
Real World Kanban作者访谈
查看>>
Confluent平台5.0支持LDAP授权及用于IoT集成的MQTT代理
查看>>
Yelp开源数据管道项目最新组件——数据管道客户端库
查看>>
ASP.NET 2.2 Preview 1首次支持Java SignalR客户端
查看>>
开源项目越来越商业友好,谁来负责开发者友好呢?
查看>>
flask 源码解析:路由
查看>>
《F# Deep Dives》书评与作者问答
查看>>
Spring命名空间解析
查看>>