Что такое Observable и зачем он используется?
Observable — это основной строительный блок реактивного программирования в Angular и библиотеке RxJS. Observable представляет собой поток данных, который может быть асинхронным, ленивым, многократным и реактивным. Он позволяет обрабатывать данные, события, ответы от сервера, пользовательские действия и любые другие источники изменений во времени как потоки, на которые можно подписываться, трансформировать и комбинировать.
1. Что такое Observable
Observable — это объект, представляющий отложенную последовательность значений, которые поступают во времени. Он аналогичен Promise, но с рядом отличий:
Характеристика | Observable | Promise |
---|---|---|
Множественные значения | ✅ | ❌ |
--- | --- | --- |
Ленивость | ✅ (не исполняется без подписки) | ❌ (исполняется сразу) |
--- | --- | --- |
Отмена | ✅ (unsubscribe) | ❌ (требуется внешний механизм) |
--- | --- | --- |
Операторы | ✅ (map, filter, etc.) | ❌ |
--- | --- | --- |
Пример простейшего Observable:
import { Observable } from 'rxjs';
const obs = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.complete();
});
2. Использование Observable в Angular
2.1. HttpClient возвращает Observable
Большинство встроенных сервисов Angular используют Observable, особенно HttpClient:
import { HttpClient } from '@angular/common/http';
constructor(private http: HttpClient) {}
getData() {
this.http.get('/api/data').subscribe(data => {
console.log(data);
});
}
3. Подписка на Observable (subscribe())
Чтобы Observable начал испускать значения, нужно подписаться:
obs.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Готово')
});
Можно использовать сокращённую форму:
obs.subscribe(value => console.log(value));
4. Ленивость и отложенное выполнение
Observable не исполняется до тех пор, пока на него не подпишутся. Это позволяет гибко управлять временем начала операций.
const obs = new Observable(observer => {
console.log('Observable инициализирован');
observer.next('Привет');
});
// До вызова .subscribe() ничего не произойдёт
obs.subscribe(console.log); // Теперь "Привет"
5. Операторы RxJS
RxJS предоставляет мощные операторы, позволяющие работать с потоками:
-
map(fn) — преобразование значения;
-
filter(fn) — фильтрация значений;
-
switchMap() — отмена предыдущего потока при запуске нового;
-
mergeMap() — параллельное объединение потоков;
-
concatMap() — последовательная обработка;
-
tap() — промежуточное действие без изменения данных;
-
debounceTime(), distinctUntilChanged() — контроль частоты событий.
Пример:
this.http.get('/api/user')
.pipe(
map(res => res\['data'\]),
tap(data => console.log('Данные:', data))
)
.subscribe();
6. Комбинирование потоков
RxJS позволяет объединять несколько Observable в один:
import { combineLatest, interval } from 'rxjs';
const a$ = interval(1000);
const b$ = interval(1500);
combineLatest(\[a$, b$\]).subscribe((\[a, b\]) => {
console.log('A:', a, 'B:', b);
});
Также есть forkJoin, zip, merge, concat и другие.
7. Observable в реактивных формах
Reactive Forms в Angular используют Observable для отслеживания изменений:
this.form.get('email')?.valueChanges
.pipe(debounceTime(300))
.subscribe(value => {
console.log('Email изменён:', value);
});
8. Управление подписками и отписка
Каждая подписка возвращает объект Subscription, у которого есть метод unsubscribe():
const sub = obs.subscribe();
sub.unsubscribe(); // завершает поток
Автоматическая отписка
Рекомендуется отписываться в ngOnDestroy:
export class MyComponent implements OnDestroy {
sub!: Subscription;
ngOnInit() {
this.sub = this.service.getData().subscribe();
}
ngOnDestroy() {
this.sub.unsubscribe();
}
}
Или использовать takeUntil, take, first, async pipe.
9. async pipe в шаблоне
Angular предоставляет async пайп, который автоматически подписывается и отписывается от Observable:
@Component({ template: \`<div>{{ user$ | async }}</div>\` })
export class AppComponent {
user$ = this.api.getUser();
}
Позволяет использовать Observable напрямую в шаблоне без подписки в компоненте.
10. Создание своих Observable
RxJS позволяет создавать Observable вручную:
const stream$ = new Observable(observer => {
let count = 0;
const id = setInterval(() => {
observer.next(count++);
if (count > 5) observer.complete();
}, 1000);
return () => clearInterval(id); // функция очистки
});
Также есть утилиты:
-
of(1,2,3) — поток значений;
-
from([1,2,3]) — из массива;
-
interval(1000) — таймер;
-
timer(3000) — отложенное значение;
-
fromEvent(dom, 'click') — из события.
11. Сравнение с другими подходами
Подход | Когда использовать |
---|---|
Callback | Простые единичные события |
--- | --- |
Promise | Однократные асинхронные операции |
--- | --- |
Observable | Множественные, асинхронные, управляемые потоки |
--- | --- |
12. Использование в state management
Observable активно используется в:
-
NgRx (store.select())
-
SignalStore (computed(() => stream.value))
-
Custom state service (BehaviorSubject)
13. Multicasting и Subjects
Если нужно поделиться данными между несколькими подписчиками — используют Subject, BehaviorSubject, ReplaySubject, AsyncSubject.
const subject = new BehaviorSubject(0);
subject.subscribe(v => console.log('A:', v));
subject.next(1);
subject.subscribe(v => console.log('B:', v)); // получит 1 сразу
14. Завершение Observable
Observable может:
-
испускать значения (next)
-
выбросить ошибку (error)
-
завершиться (complete)
После complete() или error() подписка автоматически разрывается.