1. Observable 核心概念
Observable 是 RxJS 的核心抽象——代表一个随时间产生的值序列。与 Promise 不同,Observable 可以发出多个值,支持取消订阅,且是懒执行的(不订阅就不执行)。
-
Observable
可观察的数据流,像一条"管道",能发出多个值(next)、一个错误(error)或完成信号(complete)。通过
subscribe()开始接收数据。 -
Observer
订阅 Observable 的对象,包含三个可选回调:
next(value)(接收值)、error(err)(处理错误)、complete()(完成时调用)。 -
Subscription
subscribe() 返回的对象,调用
subscription.unsubscribe()可取消订阅,防止内存泄漏。Angular 组件销毁时必须取消订阅。 -
Operator
纯函数,接受一个 Observable,返回新的 Observable,用于在
pipe()中进行数据转换、过滤、合并等操作。 -
Subject
既是 Observable 也是 Observer。可以通过
subject.next(value)主动推送值,实现多播(多个订阅者收到同一个值)。
2. 创建 Observable
import { Observable, of, from, interval, timer, Subject, BehaviorSubject } from 'rxjs';
// of:从固定值创建
const nums$ = of(1, 2, 3); // 依次发出 1, 2, 3 然后 complete
// from:从数组/Promise/Iterable 创建
const arr$ = from([10, 20, 30]);
const promise$ = from(fetch('/api/data')); // Promise 转 Observable
// interval:每隔 n 毫秒发一个数字
const tick$ = interval(1000); // 0, 1, 2... 每秒一次(不会 complete)
// timer:延迟后发出,可选重复
const delayed$ = timer(2000); // 2秒后发出 0 然后 complete
// Subject:手动控制的广播
const events$ = new Subject<string>();
events$.next('click'); // 主动推送
// BehaviorSubject:有初始值,新订阅者立即收到最新值
const count$ = new BehaviorSubject(0); // 初始值 0
count$.next(1);
count$.value; // 同步读取当前值:1
3. 转换操作符
map — 变换每个值
import { map, filter, take, tap } from 'rxjs/operators';
this.userService.getUsers().pipe(
map(users => users.filter(u => u.role === 'admin')), // 过滤管理员
map(admins => admins.map(a => ({ id: a.id, name: a.name }))), // 精简字段
tap(data => console.log('调试:', data)) // 调试,不影响流
).subscribe(admins => this.admins = admins);
switchMap vs mergeMap vs concatMap
这三个操作符都将源 Observable 的每个值映射为一个新的 Observable,但处理并发的方式不同:
| 操作符 | 并发策略 | 典型场景 |
|---|---|---|
switchMap | 只保留最新,取消之前 | 搜索框(只关心最新输入) |
mergeMap | 并行,不取消 | 并行上传多个文件 |
concatMap | 串行,排队执行 | 按顺序发送操作 |
exhaustMap | 忽略新值直到当前完成 | 提交按钮防重复 |
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';
// switchMap:搜索框,用户继续输入时取消前一次请求
searchInput$.pipe(
debounceTime(300),
switchMap(query => this.api.search(query)) // 新搜索来了,旧请求取消
);
// mergeMap:并行处理,所有结果都保留
fileIds$.pipe(
mergeMap(id => this.api.uploadFile(id)) // 同时上传多个文件
);
// concatMap:按顺序串行执行
actions$.pipe(
concatMap(action => this.api.processAction(action)) // 按顺序处理
);
// exhaustMap:提交表单防重复
submitClicks$.pipe(
exhaustMap(() => this.api.submit(data)) // 请求进行中时忽略新点击
);
4. 合并操作符
import { combineLatest, forkJoin, merge, zip } from 'rxjs';
// combineLatest:任一 Observable 发值就合并最新值
// 适合:多个过滤条件联动
combineLatest([
this.selectedCategory$,
this.searchQuery$,
this.currentPage$
]).pipe(
switchMap(([category, query, page]) =>
this.api.getProducts({ category, query, page })
)
);
// forkJoin:等所有 Observable 都 complete,取每个的最后一个值
// 适合:并行请求多个 API,等全部完成
forkJoin({
user: this.userService.getUser(id),
orders: this.orderService.getUserOrders(id),
stats: this.statsService.getUserStats(id)
}).subscribe(({ user, orders, stats }) => {
this.user = user;
this.orders = orders;
this.stats = stats;
});
// merge:多个 Observable 合并为一个流(互不等待)
merge(click$, keyup$).subscribe(event => console.log('交互:', event));
5. 取消订阅与内存泄漏
内存泄漏警告:在 Angular 组件中订阅 Observable 后,必须在组件销毁时取消订阅(infinite streams 如 interval、Subject)。否则即使组件销毁,回调仍会执行,导致内存泄漏。
import { Component, OnDestroy, inject } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
@Component({ /* ... */ })
export class MyComponent implements OnDestroy {
// 方法一:takeUntil + destroy$ Subject(推荐)
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$) // 组件销毁时自动停止
).subscribe(tick => console.log(tick));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// 方法二:使用 AsyncPipe(模板中自动订阅/取消订阅)
// 在模板中:{{ data$ | async }}
// 方法三:toSignal(Angular 16+,自动管理订阅)
import { toSignal } from '@angular/core/rxjs-interop';
export class ModernComponent {
// toSignal 在组件销毁时自动取消订阅
data = toSignal(this.service.getData(), { initialValue: [] });
}
6. 实战:实时搜索 + 购物车状态
@Injectable({ providedIn: 'root' })
export class CartService {
private items$ = new BehaviorSubject<CartItem[]>([]);
// 公开的只读 Observable
cart$ = this.items$.asObservable();
count$ = this.cart$.pipe(map(items => items.length));
total$ = this.cart$.pipe(
map(items => items.reduce((sum, i) => sum + i.price * i.qty, 0))
);
addItem(item: Product) {
const current = this.items$.value;
const existing = current.find(i => i.id === item.id);
if (existing) {
this.items$.next(
current.map(i => i.id === item.id ? { ...i, qty: i.qty + 1 } : i)
);
} else {
this.items$.next([...current, { ...item, qty: 1 }]);
}
}
removeItem(id: number) {
this.items$.next(this.items$.value.filter(i => i.id !== id));
}
}
本章小结:RxJS 是 Angular 异步编程的核心工具。Observable 处理数据流,操作符链处理转换,Subject 实现多播状态管理。takeUntil、AsyncPipe 或 toSignal 是防止内存泄漏的三种主要方式。