Chapter 07

RxJS 响应式编程

Observable 数据流、操作符管道与 Subject——Angular 异步编程的基石

1. Observable 核心概念

Observable 是 RxJS 的核心抽象——代表一个随时间产生的值序列。与 Promise 不同,Observable 可以发出多个值,支持取消订阅,且是懒执行的(不订阅就不执行)。

2. 创建 Observable

import { Observable, of, from, interval, timer, Subject, BehaviorSubject } from 'rxjs';

// of:从固定值创建
const nums$ = of(1, 2, 3); // 依次发出 1, 2, 3 然后 complete

// from:从数组/Promise/Iterable 创建
const arr$ = from([10, 20, 30]);
const promise$ = from(fetch('/api/data')); // Promise 转 Observable

// interval:每隔 n 毫秒发一个数字
const tick$ = interval(1000); // 0, 1, 2... 每秒一次(不会 complete)

// timer:延迟后发出,可选重复
const delayed$ = timer(2000); // 2秒后发出 0 然后 complete

// Subject:手动控制的广播
const events$ = new Subject<string>();
events$.next('click'); // 主动推送

// BehaviorSubject:有初始值,新订阅者立即收到最新值
const count$ = new BehaviorSubject(0); // 初始值 0
count$.next(1);
count$.value; // 同步读取当前值:1

3. 转换操作符

map — 变换每个值

import { map, filter, take, tap } from 'rxjs/operators';

this.userService.getUsers().pipe(
  map(users => users.filter(u => u.role === 'admin')),   // 过滤管理员
  map(admins => admins.map(a => ({ id: a.id, name: a.name }))),  // 精简字段
  tap(data => console.log('调试:', data))  // 调试,不影响流
).subscribe(admins => this.admins = admins);

switchMap vs mergeMap vs concatMap

这三个操作符都将源 Observable 的每个值映射为一个新的 Observable,但处理并发的方式不同:

操作符并发策略典型场景
switchMap只保留最新,取消之前搜索框(只关心最新输入)
mergeMap并行,不取消并行上传多个文件
concatMap串行,排队执行按顺序发送操作
exhaustMap忽略新值直到当前完成提交按钮防重复
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

// switchMap:搜索框,用户继续输入时取消前一次请求
searchInput$.pipe(
  debounceTime(300),
  switchMap(query => this.api.search(query)) // 新搜索来了,旧请求取消
);

// mergeMap:并行处理,所有结果都保留
fileIds$.pipe(
  mergeMap(id => this.api.uploadFile(id)) // 同时上传多个文件
);

// concatMap:按顺序串行执行
actions$.pipe(
  concatMap(action => this.api.processAction(action)) // 按顺序处理
);

// exhaustMap:提交表单防重复
submitClicks$.pipe(
  exhaustMap(() => this.api.submit(data)) // 请求进行中时忽略新点击
);

4. 合并操作符

import { combineLatest, forkJoin, merge, zip } from 'rxjs';

// combineLatest:任一 Observable 发值就合并最新值
// 适合:多个过滤条件联动
combineLatest([
  this.selectedCategory$,
  this.searchQuery$,
  this.currentPage$
]).pipe(
  switchMap(([category, query, page]) =>
    this.api.getProducts({ category, query, page })
  )
);

// forkJoin:等所有 Observable 都 complete,取每个的最后一个值
// 适合:并行请求多个 API,等全部完成
forkJoin({
  user: this.userService.getUser(id),
  orders: this.orderService.getUserOrders(id),
  stats: this.statsService.getUserStats(id)
}).subscribe(({ user, orders, stats }) => {
  this.user = user;
  this.orders = orders;
  this.stats = stats;
});

// merge:多个 Observable 合并为一个流(互不等待)
merge(click$, keyup$).subscribe(event => console.log('交互:', event));

5. 取消订阅与内存泄漏

⚠️

内存泄漏警告:在 Angular 组件中订阅 Observable 后,必须在组件销毁时取消订阅(infinite streams 如 interval、Subject)。否则即使组件销毁,回调仍会执行,导致内存泄漏。

import { Component, OnDestroy, inject } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';

@Component({ /* ... */ })
export class MyComponent implements OnDestroy {
  // 方法一:takeUntil + destroy$ Subject(推荐)
  private destroy$ = new Subject<void>();

  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)  // 组件销毁时自动停止
    ).subscribe(tick => console.log(tick));
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// 方法二:使用 AsyncPipe(模板中自动订阅/取消订阅)
// 在模板中:{{ data$ | async }}

// 方法三:toSignal(Angular 16+,自动管理订阅)
import { toSignal } from '@angular/core/rxjs-interop';

export class ModernComponent {
  // toSignal 在组件销毁时自动取消订阅
  data = toSignal(this.service.getData(), { initialValue: [] });
}

6. 实战:实时搜索 + 购物车状态

@Injectable({ providedIn: 'root' })
export class CartService {
  private items$ = new BehaviorSubject<CartItem[]>([]);

  // 公开的只读 Observable
  cart$ = this.items$.asObservable();
  count$ = this.cart$.pipe(map(items => items.length));
  total$ = this.cart$.pipe(
    map(items => items.reduce((sum, i) => sum + i.price * i.qty, 0))
  );

  addItem(item: Product) {
    const current = this.items$.value;
    const existing = current.find(i => i.id === item.id);
    if (existing) {
      this.items$.next(
        current.map(i => i.id === item.id ? { ...i, qty: i.qty + 1 } : i)
      );
    } else {
      this.items$.next([...current, { ...item, qty: 1 }]);
    }
  }

  removeItem(id: number) {
    this.items$.next(this.items$.value.filter(i => i.id !== id));
  }
}

本章小结:RxJS 是 Angular 异步编程的核心工具。Observable 处理数据流,操作符链处理转换,Subject 实现多播状态管理。takeUntil、AsyncPipe 或 toSignal 是防止内存泄漏的三种主要方式。