Как обрабатывать асинхронные операции с RxJS?
RxJS (Reactive Extensions for JavaScript) — это библиотека для реактивного программирования с использованием Observable-паттерна. Она позволяет эффективно обрабатывать асинхронные операции, такие как HTTP-запросы, события пользователя, таймеры и WebSocket-соединения. RxJS широко используется в Angular для управления потоками данных и обработки побочных эффектов.
1. Observable как основа асинхронной модели
Observable — это объект, который представляет поток данных во времени. Он может быть синхронным или асинхронным.
Простой пример асинхронного Observable:
const obs$ = new Observable(observer => {
setTimeout(() => observer.next('data'), 1000);
});
Подписка на поток:
obs$.subscribe(value => console.log(value));
2. Создание Observable
of(), from()
of(1, 2, 3).subscribe(); // Синхронные значения
from(fetch('/api')).subscribe(); // Promise -> Observable
interval(), timer()
interval(1000).subscribe(val => console.log(val)); // каждые 1 секунду
3. Обработка HTTP-запросов через HttpClient (возвращает Observable)
this.http.get('/api/users')
.pipe(
retry(3), // повторить до 3 раз при ошибке
catchError(err => of(\[\])) // обработка ошибок
)
.subscribe(users => this.users = users);
4. Операторы RxJS
map()
Преобразует поток:
source$.pipe(map(value => value \* 2));
filter()
Фильтрует значения:
source$.pipe(filter(x => x > 5));
switchMap()
Отменяет предыдущий поток и подписывается на новый:
searchInput$.pipe(
debounceTime(300),
switchMap(query => this.api.search(query))
);
Используется для поиска, автокомплита и отменяемых запросов.
mergeMap()
Запускает все вложенные Observable параллельно:
ids$.pipe(
mergeMap(id => this.api.getById(id))
);
concatMap()
Выполняет последовательно:
tasks$.pipe(concatMap(task => this.api.process(task)));
exhaustMap()
Игнорирует новые запросы до завершения текущего:
loginClicks$.pipe(
exhaustMap(() => this.authService.login())
);
5. Комбинирование потоков
combineLatest()
Комбинирует последние значения:
combineLatest(\[form.valueChanges, this.user$\])
.subscribe((\[formData, user\]) => ...);
forkJoin()
Ожидает завершения всех потоков (аналог Promise.all):
forkJoin(\[
this.api.getUser(),
this.api.getSettings()
\]).subscribe((\[user, settings\]) => ...);
6. Управление подписками
takeUntil()
Автоматическое завершение подписки при событии:
private destroy$ = new Subject();
ngOnInit() {
this.api.getData().pipe(
takeUntil(this.destroy$)
).subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
take()
Ограничение количества элементов:
interval(1000).pipe(take(5)).subscribe(console.log);
7. Ошибки и повторные попытки
catchError()
Обработка ошибок:
this.api.getData().pipe(
catchError(err => {
console.error(err);
return of(\[\]); // возвращаем альтернативу
})
)
retry(), retryWhen()
this.api.getData().pipe(retry(2)) // 2 попытки
С отложенной повторной попыткой:
retryWhen(errors =>
errors.pipe(
delay(1000),
take(3)
)
)
8. Subject как источник событий
Subject — это и Observable, и Observer.
const subject = new Subject<number>();
subject.subscribe(val => console.log('A:', val));
subject.subscribe(val => console.log('B:', val));
subject.next(1); // A:1, B:1
Используется для событий (например, нажатий кнопок) или как механизм связи между компонентами.
9. BehaviorSubject
Хранит последнее значение и отправляет его новым подписчикам.
const count$ = new BehaviorSubject<number>(0);
count$.next(1);
count$.subscribe(val => console.log(val)); // сразу получит 1
Полезен для хранения состояний (например, логина).
10. ReplaySubject и AsyncSubject
ReplaySubject(n) — повторяет n последних значений новым подписчикам.
const r$ = new ReplaySubject(2);
r$.next(1); r$.next(2); r$.next(3);
r$.subscribe(console.log); // 2, 3
AsyncSubject — передаёт только последнее значение после complete().
11. Работа с формами и событиями
fromEvent(inputEl, 'input')
.pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged()
)
.subscribe(value => this.search(value));
12. Высокоуровневые подходы
-
Инкапсуляция логики в facade services.
-
Использование async пайпа (| async) в шаблоне — подписка/отписка автоматическая.
-
Состояние — через BehaviorSubject, Signals или NgRx.
-
combineLatest, withLatestFrom — для реактивных зависимостей между данными.
13. Обработка последовательных асинхронных шагов
this.api.step1().pipe(
switchMap(res1 => this.api.step2(res1)),
switchMap(res2 => this.api.step3(res2)),
).subscribe(final => console.log(final));
14. Декларативность в компонентах
Вместо подписок — использование async:
data$ = this.api.getData();
<!-- шаблон -->
<div \*ngIf="data$ | async as data">
{{ data.name }}
</div>
Это избавляет от необходимости вручную управлять отписками.
RxJS обеспечивает мощную, гибкую и декларативную модель обработки асинхронных потоков в Angular. Он позволяет управлять любыми событиями, задержками, конкурентными запросами и состоянием приложения без колбэков и Promise-адов.