Как обрабатывать асинхронные операции с RxJS?

RxJS (Reactive Extensions for JavaScript) — это библиотека для реактивного программирования с использованием Observable-паттерна. Она позволяет эффективно обрабатывать асинхронные операции, такие как HTTP-запросы, события пользователя, таймеры и WebSocket-соединения. RxJS широко используется в Angular для управления потоками данных и обработки побочных эффектов.

1. Observable как основа асинхронной модели

Observable — это объект, который представляет поток данных во времени. Он может быть синхронным или асинхронным.

Простой пример асинхронного Observable:

const obs$ = new Observable(observer => {
setTimeout(() => observer.next('data'), 1000);
});

Подписка на поток:

obs$.subscribe(value => console.log(value));

2. Создание Observable

of(), from()

of(1, 2, 3).subscribe(); // Синхронные значения
from(fetch('/api')).subscribe(); // Promise -> Observable

interval(), timer()

interval(1000).subscribe(val => console.log(val)); // каждые 1 секунду

3. Обработка HTTP-запросов через HttpClient (возвращает Observable)

this.http.get('/api/users')
.pipe(
retry(3), // повторить до 3 раз при ошибке
catchError(err => of(\[\])) // обработка ошибок
)
.subscribe(users => this.users = users);

4. Операторы RxJS

map()

Преобразует поток:

source$.pipe(map(value => value \* 2));

filter()

Фильтрует значения:

source$.pipe(filter(x => x > 5));

switchMap()

Отменяет предыдущий поток и подписывается на новый:

searchInput$.pipe(
debounceTime(300),
switchMap(query => this.api.search(query))
);

Используется для поиска, автокомплита и отменяемых запросов.

mergeMap()

Запускает все вложенные Observable параллельно:

ids$.pipe(
mergeMap(id => this.api.getById(id))
);

concatMap()

Выполняет последовательно:

tasks$.pipe(concatMap(task => this.api.process(task)));

exhaustMap()

Игнорирует новые запросы до завершения текущего:

loginClicks$.pipe(
exhaustMap(() => this.authService.login())
);

5. Комбинирование потоков

combineLatest()

Комбинирует последние значения:

combineLatest(\[form.valueChanges, this.user$\])
.subscribe((\[formData, user\]) => ...);

forkJoin()

Ожидает завершения всех потоков (аналог Promise.all):

forkJoin(\[
this.api.getUser(),
this.api.getSettings()
\]).subscribe((\[user, settings\]) => ...);

6. Управление подписками

takeUntil()

Автоматическое завершение подписки при событии:

private destroy$ = new Subject();
ngOnInit() {
this.api.getData().pipe(
takeUntil(this.destroy$)
).subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

take()

Ограничение количества элементов:

interval(1000).pipe(take(5)).subscribe(console.log);

7. Ошибки и повторные попытки

catchError()

Обработка ошибок:

this.api.getData().pipe(
catchError(err => {
console.error(err);
return of(\[\]); // возвращаем альтернативу
})
)

retry(), retryWhen()

this.api.getData().pipe(retry(2)) // 2 попытки

С отложенной повторной попыткой:

retryWhen(errors =>
errors.pipe(
delay(1000),
take(3)
)
)

8. Subject как источник событий

Subject — это и Observable, и Observer.

const subject = new Subject<number>();
subject.subscribe(val => console.log('A:', val));
subject.subscribe(val => console.log('B:', val));
subject.next(1); // A:1, B:1

Используется для событий (например, нажатий кнопок) или как механизм связи между компонентами.

9. BehaviorSubject

Хранит последнее значение и отправляет его новым подписчикам.

const count$ = new BehaviorSubject<number>(0);
count$.next(1);
count$.subscribe(val => console.log(val)); // сразу получит 1

Полезен для хранения состояний (например, логина).

10. ReplaySubject и AsyncSubject

ReplaySubject(n) — повторяет n последних значений новым подписчикам.

const r$ = new ReplaySubject(2);
r$.next(1); r$.next(2); r$.next(3);
r$.subscribe(console.log); // 2, 3

AsyncSubject — передаёт только последнее значение после complete().

11. Работа с формами и событиями

fromEvent(inputEl, 'input')
.pipe(
map(e => e.target.value),
debounceTime(300),
distinctUntilChanged()
)
.subscribe(value => this.search(value));

12. Высокоуровневые подходы

  • Инкапсуляция логики в facade services.

  • Использование async пайпа (| async) в шаблоне — подписка/отписка автоматическая.

  • Состояние — через BehaviorSubject, Signals или NgRx.

  • combineLatest, withLatestFrom — для реактивных зависимостей между данными.

13. Обработка последовательных асинхронных шагов

this.api.step1().pipe(
switchMap(res1 => this.api.step2(res1)),
switchMap(res2 => this.api.step3(res2)),
).subscribe(final => console.log(final));

14. Декларативность в компонентах

Вместо подписок — использование async:

data$ = this.api.getData();
<!-- шаблон -->
<div \*ngIf="data$ | async as data">
{{ data.name }}
</div>

Это избавляет от необходимости вручную управлять отписками.

RxJS обеспечивает мощную, гибкую и декларативную модель обработки асинхронных потоков в Angular. Он позволяет управлять любыми событиями, задержками, конкурентными запросами и состоянием приложения без колбэков и Promise-адов.