RxJS | ReactiveX | @ GitHub
RxJS is a (Microsoft) library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Think of RxJS as Lodash for events.
ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.
Concepts in RxJS which solve async event management are:
Observable: represents the idea of an invokable collection of future values or events.
Observer: a collection of callbacks (listeners); handlers of values delivered by the Observable; Has methods:
next,errorandcomplete; each may invoke its callback (subscription function).Schedulers: centralized dispatchers to control concurrency;
setTimeout,requestAnimationFrame, etc.Subject: the equivalent to an
EventEmitter, and the only way of multicasting a value or event to multiple Observers.
Subscription: represents the execution of an Observable; an ID for cancelling the execution.
Operators: pure functions (FP) for processing collections; map, filter, concat, reduce, etc.
Observables and Observers
OBSERVABLE source -> OBSERVER -> CALLBACK -> OBSERVABLE
Rx.Observableclass has many functions to create observables from different kind of data/streams; events, event patterns, arrays, promises, single/multiple value, any kind of data structure/primitive data typesvar clkOable = Rx.Observable.fromEvent(document, 'click') clkOable.subscribe(aSubscrpnFn) var arrOable = Rx.Observable.from(array) arrOable.subscribe(aSubscrpnFn) var ajxOable = Rx.Observable.fromPromise(fetch(aURL)) ajxOable.subscribe(respFn, errFn) // An Observer has 3 methods someOable.subscribe({ next: x => gotFn(x), error: err => errFn(err), complete: () => console.log('done'), }) // Unsubscribe someOable.unsubscribe()
Operators (Consumer & Emitter)
source OBSERVABLE -> OPERATOR FUNCTION -> destination OBSERVABLE
var arrOable = Rx.Observable.from([1, 2, 3, 4, 5])
arrOable
.map(n => n * 2)
.reduce((sum, n) => sum + n, 0)
.subscribe(function(n){
console.log(n)
}, function(err) {
console.error(err)
}, function() {
console.log('done')
})
@ https://unpkg.com/rxjs@6.5.2/bundles/rxjs.umd.min.js
The above methods fail, e.g., arrOable.map(..) fails.
Completely different methods/syntax required!
arrOable = rxjs.from([1, 2, 3, 4, 5])
arrOable
.pipe(rxjs.operators.map(n => 2 * n))
.pipe(rxjs.operators.reduce((sum,n) => sum + n,0))
.subscribe(v => console.log(v))
RxJS.js
obs = Rx.Observable.create(fn)
Method to create a custom observable; returns an object having subscribe method on it.
obs.subscribe(obj)
The subscribe method takes the observer object (obj) as a param. It has methods: next, error and complete, which may be called (by fn) when an observable emits
{
next: (v) => console.log('Got ' + v),
error: (err) => console.error('ERR: ' + err),
complete: () => console.log('done'),
}