Что такое Observable и зачем он используется?

Observable — это основной строительный блок реактивного программирования в Angular и библиотеке RxJS. Observable представляет собой поток данных, который может быть асинхронным, ленивым, многократным и реактивным. Он позволяет обрабатывать данные, события, ответы от сервера, пользовательские действия и любые другие источники изменений во времени как потоки, на которые можно подписываться, трансформировать и комбинировать.

1. Что такое Observable

Observable — это объект, представляющий отложенную последовательность значений, которые поступают во времени. Он аналогичен Promise, но с рядом отличий:

Характеристика Observable Promise
Множественные значения
--- --- ---
Ленивость ✅ (не исполняется без подписки) ❌ (исполняется сразу)
--- --- ---
Отмена ✅ (unsubscribe) ❌ (требуется внешний механизм)
--- --- ---
Операторы ✅ (map, filter, etc.)
--- --- ---

Пример простейшего Observable:

import { Observable } from 'rxjs';
const obs = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.complete();
});

2. Использование Observable в Angular

2.1. HttpClient возвращает Observable

Большинство встроенных сервисов Angular используют Observable, особенно HttpClient:

import { HttpClient } from '@angular/common/http';
constructor(private http: HttpClient) {}
getData() {
this.http.get('/api/data').subscribe(data => {
console.log(data);
});
}

3. Подписка на Observable (subscribe())

Чтобы Observable начал испускать значения, нужно подписаться:

obs.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Готово')
});

Можно использовать сокращённую форму:

obs.subscribe(value => console.log(value));

4. Ленивость и отложенное выполнение

Observable не исполняется до тех пор, пока на него не подпишутся. Это позволяет гибко управлять временем начала операций.

const obs = new Observable(observer => {
console.log('Observable инициализирован');
observer.next('Привет');
});
// До вызова .subscribe() ничего не произойдёт
obs.subscribe(console.log); // Теперь "Привет"

5. Операторы RxJS

RxJS предоставляет мощные операторы, позволяющие работать с потоками:

  • map(fn) — преобразование значения;

  • filter(fn) — фильтрация значений;

  • switchMap() — отмена предыдущего потока при запуске нового;

  • mergeMap() — параллельное объединение потоков;

  • concatMap() — последовательная обработка;

  • tap() — промежуточное действие без изменения данных;

  • debounceTime(), distinctUntilChanged() — контроль частоты событий.

Пример:

this.http.get('/api/user')
.pipe(
map(res => res\['data'\]),
tap(data => console.log('Данные:', data))
)
.subscribe();

6. Комбинирование потоков

RxJS позволяет объединять несколько Observable в один:

import { combineLatest, interval } from 'rxjs';
const a$ = interval(1000);
const b$ = interval(1500);
combineLatest(\[a$, b$\]).subscribe((\[a, b\]) => {
console.log('A:', a, 'B:', b);
});

Также есть forkJoin, zip, merge, concat и другие.

7. Observable в реактивных формах

Reactive Forms в Angular используют Observable для отслеживания изменений:

this.form.get('email')?.valueChanges
.pipe(debounceTime(300))
.subscribe(value => {
console.log('Email изменён:', value);
});

8. Управление подписками и отписка

Каждая подписка возвращает объект Subscription, у которого есть метод unsubscribe():

const sub = obs.subscribe();
sub.unsubscribe(); // завершает поток

Автоматическая отписка

Рекомендуется отписываться в ngOnDestroy:

export class MyComponent implements OnDestroy {
sub!: Subscription;
ngOnInit() {
this.sub = this.service.getData().subscribe();
}
ngOnDestroy() {
this.sub.unsubscribe();
}
}

Или использовать takeUntil, take, first, async pipe.

9. async pipe в шаблоне

Angular предоставляет async пайп, который автоматически подписывается и отписывается от Observable:

@Component({ template: \`<div>{{ user$ | async }}</div>\` })
export class AppComponent {
user$ = this.api.getUser();
}

Позволяет использовать Observable напрямую в шаблоне без подписки в компоненте.

10. Создание своих Observable

RxJS позволяет создавать Observable вручную:

const stream$ = new Observable(observer => {
let count = 0;
const id = setInterval(() => {
observer.next(count++);
if (count > 5) observer.complete();
}, 1000);
return () => clearInterval(id); // функция очистки
});

Также есть утилиты:

  • of(1,2,3) — поток значений;

  • from([1,2,3]) — из массива;

  • interval(1000) — таймер;

  • timer(3000) — отложенное значение;

  • fromEvent(dom, 'click') — из события.

11. Сравнение с другими подходами

Подход Когда использовать
Callback Простые единичные события
--- ---
Promise Однократные асинхронные операции
--- ---
Observable Множественные, асинхронные, управляемые потоки
--- ---

12. Использование в state management

Observable активно используется в:

  • NgRx (store.select())

  • SignalStore (computed(() => stream.value))

  • Custom state service (BehaviorSubject)

13. Multicasting и Subjects

Если нужно поделиться данными между несколькими подписчиками — используют Subject, BehaviorSubject, ReplaySubject, AsyncSubject.

const subject = new BehaviorSubject(0);
subject.subscribe(v => console.log('A:', v));
subject.next(1);
subject.subscribe(v => console.log('B:', v)); // получит 1 сразу

14. Завершение Observable

Observable может:

  • испускать значения (next)

  • выбросить ошибку (error)

  • завершиться (complete)

После complete() или error() подписка автоматически разрывается.