This content originally appeared on DEV Community and was authored by Serif COLAKEL
Reactive programming has become one of the most powerful paradigms in modern web and mobile development. RxJS (Reactive Extensions for JavaScript) is a library that brings the power of reactive programming to JavaScript, making it easier to work with asynchronous data streams.
This article will take you from beginner concepts to advanced techniques, provide real-world examples, highlight best practices, and show you how RxJS can be used in Angular, React, React Native, and Vue applications.
Table of Contents
- What is RxJS?
- Core Building Blocks of RxJS
- Common RxJS Operators
- Best Practices
- Advanced Concepts
- RxJS in Frameworks
- 6.1 Angular
- 6.2 React
- 6.3 React Native
- 6.4 Vue
- Testing RxJS
- Real-World Use Cases
- Conclusion
- Further Reading
1. What is RxJS?
RxJS is a library for composing asynchronous and event-based programs by using Observables. It gives you tools to handle:
- Events (clicks, keypresses, scroll)
- HTTP requests
- WebSockets
- Intervals and timers
- Any asynchronous data source
Instead of using callbacks or promises, RxJS lets you work with streams. You can think of RxJS as a “Lodash for events”, providing a robust set of operators to handle asynchronous events as if they were collections.
1.1 Observables vs Promises
Feature | Promise | Observable |
---|---|---|
Value Model | Emits a single value | Can emit zero, one, or multiple values |
Execution | Eager (starts immediately) | Lazy (starts when subscribed) |
Cancellation | Not cancellable | Can unsubscribe |
Operators | Limited chaining | Rich library (map, filter, retry, etc.) |
Observables are ideal for continuous or cancellable streams like user input, while Promises are better for one-time operations like HTTP requests.
2. Core Building Blocks of RxJS
- Observable → Represents a data stream that can emit multiple values.
-
Observer → Defines how to react to
next
,error
, andcomplete
signals. - Subscription → Created when subscribing; can be cancelled.
- Operators → Functions that transform, filter, or combine streams.
- Subjects → Both an Observable and an Observer, useful for multicasting.
Cold vs Hot Observables
- Cold → Like Netflix: each subscriber gets their own stream (e.g., HTTP request).
- Hot → Like a movie theater: one stream shared by everyone (e.g., stock ticker).
3. Common RxJS Operators
Operators are categorized for easier understanding.
3.1 Creation Operators
- of → Creates an observable from static values.
import { of } from "rxjs";
of(1, 2, 3).subscribe(console.log);
- fromEvent → Creates an observable from DOM events.
import { fromEvent } from "rxjs";
fromEvent(document, "click").subscribe(() => console.log("Clicked!"));
3.2 Transformation Operators
- map → Transforms each emitted value.
import { of } from "rxjs";
import { map } from "rxjs/operators";
of(1, 2, 3)
.pipe(map((x) => x * 2))
.subscribe(console.log);
- mergeMap → Maps to new observables and flattens them concurrently.
import { fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
import { mergeMap } from "rxjs/operators";
fromEvent(document, "click")
.pipe(mergeMap(() => ajax.getJSON("/api/data")))
.subscribe(console.log);
3.3 Filtering Operators
- filter → Emits only values matching a condition.
of(1, 2, 3, 4)
.pipe(filter((x) => x % 2 === 0))
.subscribe(console.log);
- take → Emits only the first N values.
of(1, 2, 3, 4).pipe(take(2)).subscribe(console.log);
- debounceTime → Waits for pause before emitting (useful in search).
import { fromEvent } from "rxjs";
import { debounceTime } from "rxjs/operators";
fromEvent(document, "keyup")
.pipe(debounceTime(300))
.subscribe(() => console.log("Search query"));
3.4 Combination Operators
- merge → Combines multiple observables concurrently.
import { merge } from "rxjs";
merge(obs1, obs2).subscribe(console.log);
- concat → Runs observables sequentially.
import { concat } from "rxjs";
concat(obs1, obs2).subscribe(console.log);
- combineLatest → Emits latest values from multiple observables.
import { combineLatest } from "rxjs";
combineLatest([obs1, obs2]).subscribe(console.log);
3.5 Error Handling Operators
- catchError → Replaces errors with a fallback observable.
import { of } from "rxjs";
import { catchError } from "rxjs/operators";
of(1, 2, 3)
.pipe(catchError(() => of("Error occurred")))
.subscribe(console.log);
- retry → Automatically retries on failure.
import { ajax } from "rxjs/ajax";
import { retry } from "rxjs/operators";
ajax.getJSON("/api/data").pipe(retry(3)).subscribe(console.log);
3.6 Utility Operators
- tap → Side effects like logging.
import { tap } from "rxjs/operators";
of(1, 2, 3).pipe(tap(console.log)).subscribe();
- finalize → Runs cleanup logic on completion.
import { finalize } from "rxjs/operators";
of(1, 2, 3)
.pipe(finalize(() => console.log("Completed")))
.subscribe();
4. Best Practices
- Always unsubscribe from infinite streams (or use
takeUntil
). - Prefer operators over nested subscriptions.
- Choose the right flattening operator (
switchMap
,mergeMap
, etc.). - Use Subjects sparingly; prefer
BehaviorSubject
for state. - Handle errors gracefully with
catchError
andretry
.
5. Advanced Concepts
5.1 Higher-Order Observables & Flattening
- switchMap → Cancels previous, keeps latest (ideal for search).
import { fromEvent } from "rxjs";
import { switchMap } from "rxjs/operators";
fromEvent(document, "keyup")
.pipe(switchMap(() => ajax.getJSON("/api/data")))
.subscribe(console.log);
- concatMap → Queues operations (form saving).
import { fromEvent } from "rxjs";
import { concatMap } from "rxjs/operators";
fromEvent(document, "submit")
.pipe(concatMap(() => ajax.post("/api/save", formData)))
.subscribe(console.log);
- mergeMap → Runs concurrently (parallel requests).
import { fromEvent } from "rxjs";
import { mergeMap } from "rxjs/operators";
fromEvent(document, "click")
.pipe(mergeMap(() => ajax.getJSON("/api/data")))
.subscribe(console.log);
- exhaustMap → Ignores new until current finishes (prevent double clicks).
import { fromEvent } from "rxjs";
import { exhaustMap } from "rxjs/operators";
fromEvent(document, "click")
.pipe(exhaustMap(() => ajax.getJSON("/api/data")))
.subscribe(console.log);
5.2 Subject Variants
- BehaviorSubject → Holds last value (good for state management).
import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject("Initial");
subject.subscribe(console.log);
subject.next("Updated");
- ReplaySubject → Replays past values for new subscribers.
import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(2);
subject.subscribe(console.log);
subject.next("First");
subject.next("Second");
subject.next("Third");
- AsyncSubject → Emits only the last value when completed.
import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();
subject.subscribe(console.log);
subject.next("Hello");
subject.complete();
6. RxJS in Frameworks
This section covers how to integrate RxJS with popular frameworks like Angular, React, and React Native.
6.1 Angular
-
Reactive Forms (Live Search) →
debounceTime
+switchMap
to avoid spamming API.
import { FormControl } from "@angular/forms";
import { debounceTime, switchMap } from "rxjs/operators";
const searchControl = new FormControl();
searchControl.valueChanges
.pipe(
debounceTime(300),
switchMap((query) => ajax.getJSON(`/api/search?q=${query}`))
)
.subscribe(console.log);
-
State Management →
BehaviorSubject
in a service to share user state.
import { BehaviorSubject } from "rxjs";
@Injectable({ providedIn: "root" })
export class UserService {
private userSubject = new BehaviorSubject<User | null>(null);
user$ = this.userSubject.asObservable();
setUser(user: User) {
this.userSubject.next(user);
}
}
6.2 React
-
Debounced Search →
fromEvent
+debounceTime
+switchMap
insideuseEffect
.
import { useEffect } from "react";
import { fromEvent } from "rxjs";
import { debounceTime, switchMap } from "rxjs/operators";
const SearchComponent = () => {
useEffect(() => {
const subscription = fromEvent(inputRef.current, "input")
.pipe(
debounceTime(300),
switchMap((event) =>
ajax.getJSON(`/api/search?q=${event.target.value}`)
)
)
.subscribe(console.log);
return () => subscription.unsubscribe();
}, []);
return <input ref={inputRef} />;
};
-
Live Data (Stock Ticker) →
interval
to push UI updates.
import { interval } from "rxjs";
import { switchMap } from "rxjs/operators";
const stockPrice$ = interval(1000).pipe(
switchMap(() => ajax.getJSON("/api/stock-price"))
);
stockPrice$.subscribe(console.log);
6.3 React Native
-
Gesture Handling →
fromEvent
for touch events.
import { useRef, useEffect } from "react";
import { PanResponder, View } from "react-native";
import { fromEventPattern, Subscription } from "rxjs";
export default function GestureExample() {
const panResponder = useRef(
PanResponder.create({
onStartShouldSetPanResponder: () => true,
onPanResponderGrant: (evt, gestureState) => {
// touch start
},
onPanResponderMove: (evt, gestureState) => {
// touch move
},
onPanResponderRelease: (evt, gestureState) => {
// touch end
},
})
).current;
useEffect(() => {
// Convert PanResponder events into RxJS streams
const start$ = fromEventPattern(
(handler) => (panResponder.panHandlers.onStart = handler),
() => {}
);
const move$ = fromEventPattern(
(handler) => (panResponder.panHandlers.onMove = handler),
() => {}
);
const end$ = fromEventPattern(
(handler) => (panResponder.panHandlers.onEnd = handler),
() => {}
);
const subs: Subscription[] = [
start$.subscribe(() => console.log("Touch start")),
move$.subscribe(() => console.log("Touch move")),
end$.subscribe(() => console.log("Touch end")),
];
return () => subs.forEach((s) => s.unsubscribe());
}, []);
return (
<View
{...panResponder.panHandlers}
style={{ flex: 1, backgroundColor: "lightgrey" }}
/>
);
}
-
Network Retry →
from
+defer
+retry
for robust API calls.
import { from, defer } from "rxjs";
import { retry } from "rxjs/operators";
// Wrap fetch in a deferred Observable so it runs on subscription
const fetchData = () => {
return defer(() =>
from(
fetch("https://example.com/api/data").then((res) => {
if (!res.ok) throw new Error("Network error");
return res.json();
})
)
).pipe(
retry(3) // retry up to 3 times on failure
);
};
// Usage
fetchData().subscribe({
next: (data) => console.log("Data:", data),
error: (err) => console.error("Error:", err),
});
6.4 Vue
- Complex Fetching → Combine operators to fetch posts + comments + user info.
- Use helper library VueUse/RxJS for seamless integration.
7. Testing RxJS
- Use TestScheduler with virtual time.
-
Marble Diagrams describe emissions (
a--b--c|
). - Deterministic, instant tests for async streams.
8. Real-World Use Cases
-
Infinite Scroll →
fromEvent(window, 'scroll')
+throttleTime
+ API calls.
import React, { useEffect, useState } from "react";
import { FlatList, View, Text } from "react-native";
import { fromEventPattern } from "rxjs";
import { throttleTime, switchMap, scan } from "rxjs/operators";
const fetchItems = (page: number) =>
fetch(`https://example.com/api/items?page=${page}`).then((res) => res.json());
export default function InfiniteScroll() {
const [items, setItems] = useState<any[]>([]);
let page = 1;
useEffect(() => {
const scroll$ = fromEventPattern(
(handler) => handler(), // dummy for example; FlatList onEndReached can trigger
() => {}
);
const sub = scroll$
.pipe(
throttleTime(1000),
switchMap(() => fetchItems(page++)),
scan((all, newItems) => [...all, ...newItems], [])
)
.subscribe(setItems);
return () => sub.unsubscribe();
}, []);
return (
<FlatList
data={items}
keyExtractor={(item, index) => index.toString()}
renderItem={({ item }) => <Text>{item.name}</Text>}
onEndReached={() => {} /* trigger scroll$ handler */}
onEndReachedThreshold={0.5}
/>
);
}
-
Autocomplete Search →
debounceTime
+switchMap
to only keep latest request.
import React, { useState } from "react";
import { TextInput, FlatList, Text } from "react-native";
import { fromEventPattern, defer, from } from "rxjs";
import { debounceTime, switchMap } from "rxjs/operators";
const searchAPI = (query: string) =>
defer(() => from(fetch(`https://example.com/api/search?q=${query}`).then(res => res.json())));
export default function Autocomplete() {
const [results, setResults] = useState<any[]>([]);
const handleChange = (handler: any) => {
return (text: string) => handler(text);
};
const search$ = fromEventPattern<string>(
handler => handleChange(handler),
() => {}
).pipe(debounceTime(300), switchMap(searchAPI));
search$.subscribe(setResults);
return (
<TextInput placeholder="Search..." />
<FlatList
data={results}
keyExtractor={(item, index) => index.toString()}
renderItem={({ item }) => <Text>{item.name}</Text>}
/>
);
}
-
WebSocket Chat →
Subject
to send messages, Observables for receiving.
import { Subject, webSocket } from "rxjs";
const chatSubject = new Subject<string>();
// Connect to WebSocket server
const socket$ = webSocket("wss://example.com/chat");
// Send messages
chatSubject.subscribe((msg) => socket$.next(msg));
// Receive messages
socket$.subscribe({
next: (msg) => console.log("Received:", msg),
});
// Usage
chatSubject.next("Hello everyone!");
-
Form Autosave →
debounceTime
+concatMap
to save in sequence.
import { Subject } from "rxjs";
import { debounceTime, concatMap } from "rxjs/operators";
const formChanges$ = new Subject<any>();
const saveForm = (data: any) =>
fetch("https://example.com/api/save", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
}).then((res) => res.json());
formChanges$
.pipe(
debounceTime(500),
concatMap(saveForm) // ensures sequential saves
)
.subscribe((response) => console.log("Form saved:", response));
// Example usage
formChanges$.next({ name: "John" });
formChanges$.next({ name: "John Doe" });
9. Conclusion
RxJS is a must-learn skill for developers working with Angular, React, React Native, or Vue. It provides:
- Cleaner async handling
- Declarative stream processing
- Robust error handling
- Scalable state management
- Optimized performance
- Strong testing capabilities
By mastering observables, operators, and best practices, you can build responsive, efficient, and maintainable apps.
10. Further Reading
This content originally appeared on DEV Community and was authored by Serif COLAKEL