ReactiveX | Part II: Observable & Observer from scratch
10 Oct 2020The best way to have a solid understanding about a problem is to build things from bottom up. In this post, we’ll implement the core concept of Rx from scratch, step by step, but without optimization & non-essential parts.
The observables/publishers are the things that can be observed. They “push” data to their consumers. The WeatherStation
from previous part is an example for observable.
The observers/subscribers is the listeners. They consume the data and react to these. In previous part, the PC
, TV
and SmartPhone
classes are observers: they receive data from WeatherStation
and display it.
Observable is no more than a function. Let’s create a function named myObservable
:
function myObservable(observer) {}
Observer is no more than an object has 3 methods: next
to get the value, error
to get the error and complete
takes no args
const observer = {
next(value) { console.log('next -> ', value) }
error(err) {}
complete() { console.log('complete') }
}
Now let’s put some logics into myObservable
function myObservable(observer) {
for (let i = 0; i < 10; i ++) {
observer.next(i)
}
observer.complete();
}
myObservable(observer)
The output should look like this
next -> 0
next -> 1
next -> 2
next -> 3
next -> 4
next -> 5
next -> 6
next -> 7
next -> 8
next -> 9
complete
This is basic observable.
What if some async tasks need to be executed inside?
Let’s update myObservable
:
function myObservable(observer) {
const id = setTimeout(() => {
observer.next('hello')
observer.complete()
}, 1000)
return () => clearTimeout(id)
}
myObservable(observer)
“hello” and then “complete” will be shown after a second. Also notice that myObservable
now returns an unsubscription function to cancel the async task.
const unsub = myObservable(observer);
unsub();
With this update, the async task is cancelled before the “hello” appears, pretty cool 😎
But there are still have some problems. Let’s change the code:
function myObservable(observer) {
for (let i = 0; i < 10; i++ ){
observer.next(i)
}
observer.complete()
observer.next('haha')
}
const observer = {
next(value) { console.log('next -> ', value) }
error(err) {}
complete() { console.log('complete') }
}
myObservable(observer)
The problem is when observer is completed, the next
method still could be called, same problem with error
method. Let’s have a class named SafeObserver
class SafeObserver {
constructor(observer) {
this.observer = observer;
this.isUnsubscribed = false;
}
next(value) {
if (this.isUnsubscribed) {
return;
}
if (this.observer && this.observer.next) {
this.observer.next(value);
}
}
error(err) {
if (this.isUnsubscribed) {
return;
}
this.isUnsubscribed = true;
this.observer.error(err);
}
complete() {
if (this.isUnsubscribed) {
return;
}
this.isUnsubscribed = true;
this.observer.complete();
}
}
const safeObserver = new SafeObserver(observer);
myObservable(safeObserver);
Now there is no more “next” when the observer is completed.
We just built observable & observer from scratch. In real world, RxJS provides us with creation operations to easily generate Observables for a lot of sync & async case.