RxJS | ReactiveX | @ GitHub
RxJS is a (Microsoft) library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Think of RxJS as Lodash for events.
ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.
Concepts in RxJS which solve async event management are:
Observable: represents the idea of an invokable collection of future values or events.
Observer: a collection of callbacks (listeners); handlers of values delivered by the Observable; Has methods:
next
,error
andcomplete
; each may invoke its callback (subscription function).Schedulers: centralized dispatchers to control concurrency;
setTimeout
,requestAnimationFrame
, etc.Subject: the equivalent to an
EventEmitter
, and the only way of multicasting a value or event to multiple Observers.
Subscription: represents the execution of an Observable; an ID for cancelling the execution.
Operators: pure functions (FP) for processing collections; map
, filter
, concat
, reduce
, etc.
Observables and Observers
OBSERVABLE source -> OBSERVER -> CALLBACK -> OBSERVABLE
Rx.Observable
class has many functions to create observables from different kind of data/streams; events, event patterns, arrays, promises, single/multiple value, any kind of data structure/primitive data typesvar clkOable = Rx.Observable.fromEvent(document, 'click') clkOable.subscribe(aSubscrpnFn) var arrOable = Rx.Observable.from(array) arrOable.subscribe(aSubscrpnFn) var ajxOable = Rx.Observable.fromPromise(fetch(aURL)) ajxOable.subscribe(respFn, errFn) // An Observer has 3 methods someOable.subscribe({ next: x => gotFn(x), error: err => errFn(err), complete: () => console.log('done'), }) // Unsubscribe someOable.unsubscribe()
Operators (Consumer & Emitter)
source OBSERVABLE -> OPERATOR FUNCTION -> destination OBSERVABLE
var arrOable = Rx.Observable.from([1, 2, 3, 4, 5])
arrOable
.map(n => n * 2)
.reduce((sum, n) => sum + n, 0)
.subscribe(function(n){
console.log(n)
}, function(err) {
console.error(err)
}, function() {
console.log('done')
})
@ https://unpkg.com/rxjs@6.5.2/bundles/rxjs.umd.min.js
The above methods fail, e.g., arrOable.map(..)
fails.
Completely different methods/syntax required!
arrOable = rxjs.from([1, 2, 3, 4, 5])
arrOable
.pipe(rxjs.operators.map(n => 2 * n))
.pipe(rxjs.operators.reduce((sum,n) => sum + n,0))
.subscribe(v => console.log(v))
RxJS.js
obs = Rx.Observable.create(fn)
Method to create a custom observable; returns an object having subscribe
method on it.
obs.subscribe(obj)
The subscribe
method takes the observer object (obj
) as a param. It has methods: next
, error
and complete
, which may be called (by fn
) when an observable emits
{
next: (v) => console.log('Got ' + v),
error: (err) => console.error('ERR: ' + err),
complete: () => console.log('done'),
}