- Published on
RxJS(二):掌握 RxJS 操作符与订阅管理
- Authors
- Name
- 青雲
在上一篇文章中,我们介绍了 RxJS 的基础知识和核心概念,讨论了 Observables、Observers 和 Subscriptions 如何在响应式编程中发挥作用。这篇文章将深入探讨 RxJS 的操作符和订阅管理,帮助你更好地掌握 RxJS 的强大功能,实现复杂的响应式编程逻辑。
引言
回顾 RxJS 基础概念
在继续深入之前,简要回顾一下 RxJS 的基础概念,包括 Observable(可观察对象)、Observer(观察者)、Subscription(订阅)等。通过这些概念,我们能够创建、管理和订阅数据流。
操作符的重要性
操作符(Operators)在 RxJS 中起着至关重要的作用。它们是对 Observable 进行转换、过滤、组合等操作的核心工具,使得处理复杂的数据流变得更加简洁和高效。
操作符详解
转换操作符
转换操作符是将一个 Observable 发出的值通过某种方式转换,并发出新的 Observable。这些操作符是 RxJS 的核心功能之一,通过变化数据流来实现异步编程的复杂逻辑。
map
map
操作符对 Observable 发出的每个值应用一个函数,并发出函数的结果。它类似于数组的 map
方法,但适用于 Observable。
import { of } from 'rxjs'
import { map } from 'rxjs/operators'
const numbers$ = of(1, 2, 3) // 创建一个发出数字的 Observable
const squaredNumbers$ = numbers$.pipe(map((x) => x * x)) // 对每个值应用平方函数
squaredNumbers$.subscribe((value) => console.log(value)) // 输出: 1, 4, 9
mapTo
mapTo
操作符将每个发出的值映射为同一个常量值。
import { of } from 'rxjs'
import { mapTo } from 'rxjs/operators'
const numbers$ = of(1, 2, 3) // 创建一个发出数字的 Observable
const mappedNumbers$ = numbers$.pipe(mapTo('Map')) // 将每个值映射为字符串 "Map"
mappedNumbers$.subscribe((value) => console.log(value)) // 输出: Map, Map, Map
switchMap
switchMap
操作符将发出的每个值映射为一个新的 Observable,并取消订阅之前的内部 Observable,当源 Observable 发出新值时。
import { fromEvent, interval } from 'rxjs'
import { switchMap } from 'rxjs/operators'
const clicks$ = fromEvent(document, 'click') // 创建一个从点击事件中发出值的 Observable
const result$ = clicks$.pipe(switchMap(() => interval(1000))) // 将每个点击事件映射为新的 interval Observable
result$.subscribe((value) => console.log(value)) // 每次点击后,开始新的计时器并取消上一个计时器
mergeMap
mergeMap
操作符将每个发出的值映射成一个新的 Observable,并合并所有这些内部 Observables,即并发地处理它们。
import { of } from 'rxjs'
import { mergeMap } from 'rxjs/operators'
const letters$ = of('a', 'b', 'c') // 创建一个发出字母的 Observable
const result$ = letters$.pipe(mergeMap((letter) => of(letter + '1', letter + '2'))) // 将每个值映射为两个发出新值的 Observables
result$.subscribe((value) => console.log(value)) // 输出: a1, a2, b1, b2, c1, c2
concatMap
concatMap
操作符将每个发出的值映射成一个新的 Observable,并按顺序订阅这些内部 Observables。
import { of } from 'rxjs'
import { concatMap, delay } from 'rxjs/operators'
const letters$ = of('a', 'b', 'c') // 创建一个发出字母的 Observable
const result$ = letters$.pipe(concatMap((letter) => of(letter).pipe(delay(1000)))) // 将每个值映射为一个延迟发出的 Observable
result$.subscribe((value) => console.log(value)) // 每隔1秒依次输出: a, b, c
过滤操作符
过滤操作符用于根据条件筛选 Observable 中的数据流。通过这些操作符,我们可以控制哪些数据可以通过流动,而哪些数据会被丢弃。
filter
filter
操作符基于一个谓词函数过滤Observable 发出的值,只发出通过过滤条件的值。
import { of } from 'rxjs'
import { filter } from 'rxjs/operators'
const numbers$ = of(1, 2, 3, 4, 5) // 创建一个发出数字的 Observable
const filteredNumbers$ = numbers$.pipe(filter((x) => x % 2 === 0)) // 过滤掉奇数,只发出偶数
filteredNumbers$.subscribe((value) => console.log(value)) // 输出: 2, 4
first
first
操作符只发出 Observable 发出的第一个值,并完成。如果指定了谓词,则发出第一个符合条件的值。
import { of } from 'rxjs'
import { first } from 'rxjs/operators'
const numbers$ = of(1, 2, 3, 4, 5) // 创建一个发出数字的 Observable
const firstNumber$ = numbers$.pipe(first()) // 只发出第一个值
firstNumber$.subscribe((value) => console.log(value)) // 输出: 1
last
last
操作符只发出 Observable 发出的最后一个值。如果指定了谓词,则发出最后一个符合条件的值。
import { of } from 'rxjs'
import { last } from 'rxjs/operators'
const numbers$ = of(1, 2, 3, 4, 5) // 创建一个发出数字的 Observable
const lastNumber$ = numbers$.pipe(last()) // 只发出最后一个值
lastNumber$.subscribe((value) => console.log(value)) // 输出: 5
take
take
操作符只发出前 n 个值,然后完成
import { of } from 'rxjs'
import { take } from 'rxjs/operators'
const numbers$ = of(1, 2, 3, 4, 5) // 创建一个发出数字的 Observable
const firstThree$ = numbers$.pipe(take(3)) // 只发出前3个值
firstThree$.subscribe((value) => console.log(value)) // 输出: 1, 2, 3
skip
skip
操作符跳过前 n 个值,然后发出后续值。
import { of } from 'rxjs'
import { skip } from 'rxjs/operators'
const numbers$ = of(1, 2, 3, 4, 5) // 创建一个发出数字的 Observable
const skippedFirstTwo$ = numbers$.pipe(skip(2)) // 跳过前2个值
skippedFirstTwo$.subscribe((value) => console.log(value)) // 输出: 3, 4, 5
组合操作符
组合操作符用于组合多个 Observable,从而创建一个新的 Observable。这些操作符允许你灵活地处理多个数据流,使得你可以更高效地处理复杂的异步操作。
merge
merge
操作符将多个 Observables 合并为一个 Observable,以并发的方式发出值。它会订阅所有的输入 Observable,并将它们发出的所有值作为单个 Observable 发出。
import { of, merge } from 'rxjs'
import { delay } from 'rxjs/operators'
// 创建两个 Observables,第二个 Observable 会延迟发出值
const observable1 = of('a', 'b', 'c') // 第一个源 Observable
const observable2 = of('1', '2', '3').pipe(delay(1000)) // 第二个源 Observable,每个值延迟1秒发出
// 使用 merge 将两个 Observables 合并起来
const merged$ = merge(observable1, observable2)
// 订阅并输出值
merged$.subscribe((value) => console.log(value))
// 输出: a, b, c (来自于observable1)
// 输出: 1, 2, 3 (在1秒后, 来自于observable2,且并发发出)
concat
concat
操作符将多个 Observables 串联到一起来发出值,这些 Observables 会按顺序触发,前一个 Observable 完成后才会开始发出下一个 Observable 的值。
import { of, concat } from 'rxjs'
// 创建两个 Observables
const observable1 = of('a', 'b', 'c') // 第一个源 Observable
const observable2 = of('1', '2', '3') // 第二个源 Observable
// 使用 concat 将两个 Observables 串联起来
const concatenated$ = concat(observable1, observable2)
// 订阅并输出值
concatenated$.subscribe((value) => console.log(value))
// 输出: a, b, c, 1, 2, 3,只有在observable1的所有值都发出后, observable2才会发出它的值
combineLatest
combineLatest
操作符会等待所有输入 Observable 都发出至少一个值,然后每当其中任意一个 Observable 发出值时,将最新的每个 Observable 发出的值作为数组组合并发出。
import { timer, combineLatest } from 'rxjs'
import { map } from 'rxjs/operators'
// 创建两个 timer Observable
const timer1$ = timer(0, 1000).pipe(map((value) => `Timer1: ${value}`)) // 第一个值立即发出,然后每隔 1 秒继续发出递增的值,并标记是 Timer1 的值
const timer2$ = timer(500, 1000).pipe(map((value) => `Timer2: ${value}`)) // 第一个值在 500 毫秒后发出,然后每隔 1 秒继续发出递增的值,并标记是 Timer2 的值
// 组合两个 Observable,发出最新的组合值
const combined$ = combineLatest([timer1$, timer2$])
// 订阅并输出组合后的值
combined$.subscribe((value) => console.log(value))
// 初次组合值输出: ['Timer1: 0', 'Timer2: 0']
// 每次 Timer1 或 Timer2 发出新值时,输出组合后的最新值: ['Timer1: 1', 'Timer2: 0'], ['Timer1: 1', 'Timer2: 1'], ['Timer1: 2', 'Timer2: 1'], ...
- t = 0 ms:
timer1$
发出Timer1: 0
timer2$
还未发出值,combineLatest$
不会发出值
- t = 500 ms:
timer1$
保持不变,但timer2$
发出Timer2: 0
combineLatest$
输出[Timer1: 0, Timer2: 0]
- t = 1000 ms:
timer1$
发出Timer1: 1
timer2$
保持Timer2: 0
combineLatest$
输出[Timer1: 1, Timer2: 0]
- t = 1500 ms:
timer1$
保持Timer1: 1
timer2$
发出Timer2: 1
combineLatest$
输出[Timer1: 1, Timer2: 1]
- t = 2000 ms:
timer1$
发出Timer1: 2
timer2$
保持Timer2: 1
combineLatest$
输出[Timer1: 2, Timer2: 1]
withLatestFrom
withLatestFrom
操作符会结合主 Observable 的值和其他 Observable 最新发出的值,并作为数组发出。
import { fromEvent, interval } from 'rxjs'
import { withLatestFrom } from 'rxjs/operators'
const clicks$ = fromEvent(document, 'click') // 点击事件 Observable
const timer$ = interval(1000) // 每秒递增的 Observable
const result$ = clicks$.pipe(withLatestFrom(timer$)) // 结合点击事件和最新的计时值
result$.subscribe((value) => console.log(value)) // 每次点击时输出:[MouseEvent, timer的最新值]
创建操作符
创建操作符用于生成新的 Observable,它们是 RxJS 中非常重要的一部分。这些操作符能够帮助我们轻松创建 Observable,以适应不同的需求。
startWith
startWith
操作符在 Observable 发出任何值之前首先发出一个特定的值。它可以用于提供初始值或默认值。
import { of } from 'rxjs'
import { startWith } from 'rxjs/operators'
// 创建一个发出1、2、3的Observable,并在开始时先发出0
const numbers$ = of(1, 2, 3).pipe(startWith(0))
// 订阅并输出值
numbers$.subscribe((value) => console.log(value)) // 输出: 0, 1, 2, 3
timer
timer
操作符会在指定时间后发出一个值。如果提供两个参数,则它还会接着以指定的时间间隔发出值。
import { timer } from 'rxjs'
// 创建一个在2秒后发出第一个值的Observable
const singleTimer$ = timer(2000)
// 订阅并输出值
singleTimer$.subscribe((value) => console.log('Single Timer:', value)) // 2秒后输出: Single Timer: 0
// 创建一个在2秒后发出第一个值,然后每隔1秒发出值的Observable
const intervalTimer$ = timer(2000, 1000)
// 订阅并输出值
intervalTimer$.subscribe((value) => console.log('Interval Timer:', value)) // 2秒后第一次输出: Interval Timer: 0, 然后每隔1秒递增: Interval Timer: 1, 2, 3, ...
Marble Diagrams:
时间(s): 0-1-2-3-4-5-6-7-
timer(2000): ----0--|
timer(2000, 1000): ----0--1--2--3--|
range
range
操作符创建一个发出特定范围值序列的 Observable。它允许我们指定起始值和数量。
import { range } from 'rxjs'
// 创建一个从5开始,发出5个连续整数的Observable
const numbers$ = range(5, 5)
// 订阅并输出值
numbers$.subscribe((value) => console.log(value)) // 输出: 5, 6, 7, 8, 9
Marble Diagrams:
range(5, 5): --5--6--7--8--9--|
错误处理操作符
在处理 Observable 的过程中,错误处理是一个重要的部分。RxJS 提供了几种错误处理操作符,帮助我们优雅地处理和恢复来自 Observable 的错误流。
catchError
catchError
操作符用于捕获源 Observable 中的错误,并返回一个新的 Observable 或抛出一个错误。这使我们能够从错误中恢复并继续流的处理。
import { of, throwError } from 'rxjs'
import { catchError } from 'rxjs/operators'
const source$ = throwError('An error occurred!') // 创建一个发出错误的Observable
const handled$ = source$.pipe(
catchError((error) => {
console.error('Caught error:', error)
return of('Default value') // 返回一个新的 Observable 作为替换
})
)
// 订阅并输出值
handled$.subscribe((value) => console.log(value)) // 输出: Caught error: An error occurred! 默认值: Default value
Marble Diagrams:
Source Observable: ----#|
catchError: ----(Default value)|
retry
retry
操作符用于在源 Observable 遇到错误时自动重新订阅。可以指定重新尝试的次数,超过次数后如果仍然发生错误,则错误将被最终传递出去。
import { of, Observable } from 'rxjs'
import { retry, catchError } from 'rxjs/operators'
let count = 0
const source$ = new Observable((observer) => {
if (count++ < 2) {
observer.error('Error occurred')
} else {
observer.next('Success')
observer.complete()
}
})
const retrying$ = source$.pipe(
retry(2), // 重新尝试2次
catchError((error) => of(`Caught final error after 2 retries: ${error}`))
)
// 订阅并输出值
retrying$.subscribe((value) => console.log(value))
// 第三次应该成功,因此输出: "Success"
Marble Diagrams:
Source Observable:
----# (第一次: 发射错误)
----# (第二次: 重试后发射错误)
----(Success)| (第三次: 重试后成功,并完成)
retry
操作符通过 retryOptions
来配置重试行为,包括最大重试次数和重试通知等。我们可以使用它来实现自定义重试逻辑。
import { of, timer, throwError, Observable } from 'rxjs'
import { retry, catchError, tap } from 'rxjs/operators'
// 假设我们有一个可能失败的过程
let failureCount = 0
const unreliableObservable = new Observable((observer) => {
if (failureCount < 2) {
observer.error('Simulated failure')
failureCount++
} else {
observer.next('Success')
observer.complete()
}
})
// 使用retryOptions进行重试控制
const retrying$ = unreliableObservable.pipe(
retry({
count: 3, // 最大重试次数
delay: (error, retryCount) => {
console.log(`Retry attempt ${retryCount}, error: ${error}`)
return timer(1000 * retryCount) // 重试时延时间
},
}),
catchError((error) => of(`Final error after retries: ${error}`))
)
// 订阅并输出值
retrying$.subscribe((value) => console.log(value))
// 输出:
// Retry attempt 1, error: Simulated failure
// Retry attempt 2, error: Simulated failure
// Success
Marble Diagrams:
Source Observable: ----#|
1st retry (with delay): ----1s----#|
2nd retry (with delay): ----2s----#|
3rd retry (with delay): ----3s----(Success)|
辅助操作符
辅助操作符(Utility Operators)用于辅助其他操作符或提供便利的方法。它们通常不改变数据流的内容,而是增强可观察对象(Observable)的行为。
tap
tap
操作符在 Observable 的每个值被发出时执行副作用操作。它主要用于调试和日志记录,帮助你观察数据流而不改变数据本身。
import { of } from 'rxjs'
import { tap, map } from 'rxjs/operators'
// 创建一个发出数字的 Observable,并记录每个值
const numbers$ = of(1, 2, 3).pipe(
tap((value) => console.log('Before map:', value)), // 输出: 'Before map: 1', 'Before map: 2', 'Before map: 3'
map((value) => value * 10), // 转换数据流的值
tap((value) => console.log('After map:', value)) // 打印转换后的值: 'After map: 10', 'After map: 20', 'After map: 30'
)
// 订阅并输出值
numbers$.subscribe((value) => console.log('Final value:', value)) // 输出: 10, 20, 30
delay
delay
操作符将延迟 Observable 的发射时间。它用于控制发射的时序,可以为基于时间的操作提供便利。
import { of } from 'rxjs'
import { delay } from 'rxjs/operators'
// 创建一个延迟发出值的 Observable
const delayed$ = of('Hello').pipe(delay(2000)) // 延迟 2 秒发出值
delayed$.subscribe((value) => console.log(value)) // 2 秒后输出: 'Hello'
Marble Diagrams:
Source Observable: --(Hello)--|
delay(2000): ----(Hello)--|
timeout
timeout
操作符如果在指定时间内没有发出值,将会抛出一个超时错误。这对于防止数据流卡住并提供超时处理非常有用。
import { of, timer } from 'rxjs'
import { timeout, catchError } from 'rxjs/operators'
// 创建一个发出延迟值的 Observable
const lateObservable$ = timer(3000).pipe(
timeout(2000), // 如果超过 2 秒没有发出值,则抛出超时错误
catchError((error) => of(`Error: ${error.message}`)) // 捕获并处理错误
)
lateObservable$.subscribe((value) => console.log(value)) // 2 秒后输出: 'Error: Timeout has occurred'
Marble Diagrams:
Source Observable: -------(value)|
timeout(2000): -----(TimeoutError)--|
多播和分享 Observable
多播和分享 Observable 是 RxJS 中非常重要的概念,它们允许多个订阅者共享一个单独的 Observable 执行,避免重复订阅和重复执行。通过多播和分享操作符,我们可以优化性能,提高代码的可读性和可维护性。
connect
connect
操作符通过在 Observable 中多播数据流的方式,允许开发者定义在连接前如何使用多播的 Observable。适用于需要手动多播的场景。
import { of, tap, connect, merge, map, filter } from 'rxjs'
// 源 Observable
const source$ = of(1, 2, 3, 4, 5).pipe(
tap({
subscribe: () => console.log('subscription started'),
next: (n) => console.log(`source emitted ${n}`),
})
)
// 使用 connect 进行多播
source$
.pipe(
connect((shared$) =>
merge(
shared$.pipe(map((n) => `all ${n}`)),
shared$.pipe(
filter((n) => n % 2 === 0),
map((n) => `even ${n}`)
),
shared$.pipe(
filter((n) => n % 2 !== 0),
map((n) => `odd ${n}`)
)
)
)
)
.subscribe(console.log)
share
share
操作符是使用最广泛的操作符之一。它是 multicast 和 refCount 的简写。在有第一个订阅者时开始订阅原始 Observable,并在所有订阅者取消订阅时停止订阅。
import { interval } from 'rxjs'
import { share, tap, take, map } from 'rxjs/operators'
// 创建一个每秒发出一个值的 Observable
const source$ = interval(1000).pipe(
tap((x) => console.log('Processing:', x)),
map((x) => `Stream ${x}`),
take(5),
share()
)
// 第一个订阅者
source$.subscribe((x) => console.log('Subscription 1:', x))
// 延迟 2 秒后的第二个订阅者开始订阅
setTimeout(() => {
source$.subscribe((x) => console.log('Subscription 2:', x))
}, 2000)
shareReplay
shareReplay 操作符不仅能多播数据流,还能重播指定数量的最近发出的值给新的订阅者。这对于需要新订阅者获取以前发出的值的场景非常有用。
import { interval } from 'rxjs'
import { shareReplay, take } from 'rxjs/operators'
// 创建一个每秒发出一个值的 Observable
const source$ = interval(1000).pipe(
take(6),
shareReplay(3) // 重播最近的3个值
)
source$.subscribe((x) => console.log('Sub A:', x))
source$.subscribe((x) => console.log('Sub B:', x))
// 延迟后第三个订阅者开始订阅
setTimeout(() => {
source$.subscribe((x) => console.log('Sub C:', x))
}, 11000)
Subject
Subject 是一种特殊类型的 Observable,它允许多播值给多个观察者。Subject 是观察者,也是可观察对象。
Subject
Subject
是最基础的 Subject 类型,它既是一个 Observable,也是一个 Observer,可以多播值给多个订阅者。
import { Subject } from 'rxjs'
const subject = new Subject<number>()
// 订阅者 1
subject.subscribe({
next: (v) => console.log(`Observer 1: ${v}`),
})
// 订阅者 2
subject.subscribe({
next: (v) => console.log(`Observer 2: ${v}`),
})
// 通过 subject 发出值
subject.next(1)
subject.next(2)
Marble Diagrams:
Subject: --1--2--|
Observer 1: ------1--2--|
Observer 2: ------1--2--|
BehaviorSubject
BehaviorSubject
是一种特殊的 Subject,它总是保存最新的值,并将在新的观察者订阅时立即发出这个值。
import { BehaviorSubject } from 'rxjs'
const behaviorSubject = new BehaviorSubject<number>(0) // 初始值为 0
// 订阅者 1
behaviorSubject.subscribe({
next: (v) => console.log(`Observer 1: ${v}`),
})
// 通过 BehaviorSubject 发出值
behaviorSubject.next(1)
behaviorSubject.next(2)
// 订阅者 2
behaviorSubject.subscribe({
next: (v) => console.log(`Observer 2: ${v}`),
})
behaviorSubject.next(3)
ReplaySubject
ReplaySubject
缓存所有发出的值,并将这些值重播给新的观察者。可以指定缓冲区大小来限制重放的值个数。
import { ReplaySubject } from 'rxjs'
const replaySubject = new ReplaySubject<number>(2) // 缓存最近 2 个值
// 订阅者 1
replaySubject.subscribe({
next: (v) => console.log(`Observer 1: ${v}`),
})
// 通过 ReplaySubject 发出值
replaySubject.next(1)
replaySubject.next(2)
replaySubject.next(3)
// 订阅者 2
replaySubject.subscribe({
next: (v) => console.log(`Observer 2: ${v}`),
})
replaySubject.next(4)
AsyncSubject
AsyncSubject
只会在完成时发出最后一个值,然后完成。
import { AsyncSubject } from 'rxjs'
const asyncSubject = new AsyncSubject<number>()
// 订阅者 1
asyncSubject.subscribe({
next: (v) => console.log(`Observer 1: ${v}`),
})
// 通过 AsyncSubject 发出值
asyncSubject.next(1)
asyncSubject.next(2)
asyncSubject.next(3)
// 订阅者 2
asyncSubject.subscribe({
next: (v) => console.log(`Observer 2: ${v}`),
})
// 发出 complete
asyncSubject.complete()
结论
我们在本章中详细探讨了 RxJS 中的重要操作符和管理订阅的最佳实践。通过多个小节逐步深入,我们覆盖了变换、过滤、组合、创建、错误处理、辅助操作符以及多播与分享 Observable 和 Subject 的详细知识。 下一篇文章将深入探讨 RxJS 的高级应用和实战案例,如果你有任何问题或想法,欢迎交流和探讨!