Getting Started with Reactive Programming with RxJS in JavaScript

Article By Digamber Rawat Published on

functional reactive programming RxJS

What is Reactive programming?.

Reactive Programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.
– Wikipedia

DigitalOcean Affiliate

In this article, we are going to talk about the basics of Reactive programming. ReactiveX is the most popular collection of API’s for Reactive programming, ReactiveX offers specific libraries for a specific programming language. RxJS is the one which is very famous for JavaScript.

Reactive Programming manages the asynchronous data streams using the following pattern.

  • Stream
  • Observable
  • Observer
  • Subscribe
  • Unsubscribe
  • Operators
  • Subject

Let’s understand step by step the core concepts of Functional Reactive Programming with RxJS

What is Stream in RxJS?

A series of ongoing events requested to achieve the certain task at a given time is known as Stream in RxJS.

What is an Observable in RxJS?

Observables are pretty useful they are used to handle the asynchronous operations in RxJS, In Reactive programming observer subscribes to an Observable. Observable emits the stream of data and observer listens to it and then responds appropriately.

Understand Observables by creating a little demo here.

I’ll be creating a myObservable$variable with the dollar symbol, this $ symbol indicates its an observable. It’s not a universal rule but imagine when you are working on enterprise level app and there are hundreds of variables defined then how will you recognize only Observables.

As you can see Observer responds to Observable with next, error and complete methods.

For the demo purpose, we are going to use stackblitz.com

import { Observable } from 'rxjs'; 
// Declared Observable
var MyObservable$ = Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete()
});
// Subscribing MyObservable$
MyObservable$.subscribe({
next:((result) => {
console.log(result)
}),
error:((error) => {
console.log('Erorr occurred' + error)
}),
complete:(() => {
console.log('Task completed')
})
})

Check out the Demo

What is an Observer in ReactiveX?

An observer is a collection of callback methods, next(), error() and complete(). An observer is subscribed to an Observable, it listens to the data emitted by the Observable.

Have a look on Observer’s 3 callback methods.

next():

This method is called when we subscribe to an Observable then it calls the next()method and emits the new value.

const hellowWorld$ = new Observable(observer => {
observer.next(1);
});
hellowWorld$.subscribe({
next: ((result) => {
console.log('Hello World')
}),
}); 
// Result === Hello World!

error():

When an error occurs then error() method is called by the subscriber and it immediately stops working and won’t call the complete() method.

const hellowWorld$ = new Observable(observer => {
observer.next(1);
observer.c();
});
hellowWorld$.subscribe({
next: ((result) => {
console.log('Hello World')
}),
error: ((err) => {
console.log('Error occured')
}),  
complete: (() => {
console.log('Task completed!')
})
});

complete():

When we subscribe to an Observable then it keeps on emitting the values. When there are no values left to be emitted then observer calls the complete() method.

const hellowWorld$ = new Observable(observer => {
observer.next(1);
observer.complete();
});
hellowWorld$.subscribe({
next: ((result) => {
console.log('Hello World')
}),
complete: (() => {
console.log('Task completed!')
})
}); 
// completed() === 'Task completed!'!

Understand Subscribe in RxJS

Subscribe is a method to execute an Observable in ReactiveX. It connects Observable to the observer, subscribe() method handles the data emitted by the Observable using next(), error() and complete method.

The subscribe() method initializes the Observable, and manages the data received by Observable.

import { Observable } from 'rxjs'; 
// Declare Observable
const avenger$ = new Observable(() => {
console.log('EndGame is near')
});
// Subscription method
let subscribeAvenger = avenger$.subscribe((result) => {
console.log(result)
});

What is unsubscribe() in Reactive Programming?

In Reactive programming when you subscribe to an Observable, it needs to be unsubscribed after a certain period of time otherwise it will lead to the memory leak.

We can unsubscribe to an observable or event in RxJS to prevent a memory leak by using unsubscribe() method.

How to use the unsubscribe() method in ReactiveX?

import { Observable } from 'rxjs'; 
// Declare Observable
const avenger$ = new Observable(() => {
console.log('EndGame is near')
});
// Subscription method
let subscribeAvenger = avenger$.subscribe((result) => {
console.log(result)
});
// Unsubscribe subscription to prevent memory leak
subscribeAvenger.unsubscribe();

Introduction to Operators in RxJS

Operators are just another piece of functions, Operators in RxJS enable functional reactive programming style to manipulate the collections of data. e.g map, reduce, filter, concat

RxJS library offers plenty of operators for handling every kind of situation in functional reactive programming but I am going to mention mostly used operators here.

  • Combination
  • Creation
    • from
    • of
  • Error Handling
    • catch / catchError
  • Filtering
    • debounceTime
    • distinctUntilChanged
    • filter
    • take
    • takeUntil
  • Multicasting
    • share
    • shareReplay
  • Transformation
    • bufferTime
    • concatMap
    • map
    • mergeMap / flatMap
    • scan
    • switchMap
  • Utility
    • do / tap
    • finalize / finally

Operators Examples in Reactive Programming

In this section, I am going to share with you how you can use some of the operators with RxJS.

Here is the simple of() and map() operator example.

import { of } from 'rxjs'; 
import { map } from 'rxjs/operators';
const nums = of(5, 6, 7, 8);
const multiplyVal = map((result: number) => result * result);
const calculateVal = multiplyVal(nums);
calculateVal.subscribe(x => console.log(x));
// Logs
// 25
// 36
// 49
// 64

Check out the Demo

Here is the simple pipe(), filter() and map() operator
example.

import { of } from 'rxjs'; 
import { filter, map } from 'rxjs/operators';
const multiplyOdd = of(2, 3, 4, 5, 6)
.pipe(
filter(n => n % 2 == 0),
map(n => n * n)
);
// Subscribe and get values
multiplyOdd.subscribe(x => console.log(x));
// Log
// 4
// 16
// 36

Check out the Demo

Subject in Reactive Programming

In RxJS Subject is an Observable which allows a value to be multicasted to multiple Observers. Subject works same as EventEmitters, they keep the records of many listeners at once.

Subject has methods like next(), error() and complete() like Observable. An Observable is by default Unicast by nature, that is the main reason we use Subject to multicast the values.

import { Subject } from 'rxjs';
const mySubject = new Subject<number>();
mySubject.subscribe({
next: (v) => console.log(1)
});
mySubject.subscribe({
next: (v) => console.log(2)
});
mySubject.next(1);
mySubject.next(2);
// Logs:
// observer A: 1
// observer B: 1
// observer A: 2
// observer B: 2

Check out the Demo

Let’s understand the difference between unicasting and multicasting!

Unicasting in RxJS

Unicasting in RxJS
In unicasting, every subscribed observer is responsible for an individual execution of the Observable.

Multicasting in RxJS

Multicasting in RxJS
In Multicasting a single Observable’s execution is responsible among the multiple subscribers

Thats it for now.

Thank a lot for taking the time to read this article, I hope this article has been helpful to you.

Now you can understand the basic concepts of RxJS like Observable, Observer, Subscription, Unsubscription, Operators, and Subject. In the near future, I will be writing detailed articles about all the reactive programming concepts and their way of working. If you have any suggestion or feedback for me you can mention in the comment section below.

Have a good day, keep learning!

Digamber Rawat
Digamber Rawat

I am a software engineer from India, love to learn and write about latest web and mobile technologies like: MongoDB, Angular 2+, Firebase, Express JS, Python, Node JS, JavaScript, RxJS etc.