Class notes: Build Your Own RxJS Pipeable Operators

Put that in your `pipe` and smoke it!

code typescript RxJS

I took John Lindquist's online video course Build Your Own RxJS Pipeable Operators which comprises 12 videos in 31 minutes total. Yep, that is fast! While I can recommend the course if you're interested in RxJS 6, you do need some experience with RxJS, and Lindquist is super speedy necessitating frequent pauses.

The course covers what it says right on the tin: how to construct your own RxJS pipe operators, which I found useful for both getting a grasp on RxJS and for writing more expressively in RxJS. These are the class notes. The meat of this post and that course is in these pieces of (Typescript) code:

const makeRxJSOperator = (Type: any) => (source: Observable<any>) =>
source.lift({
call(sub, source) {
source.subscribe(new Type(sub))
},
})
// This function makeRxJSOperator is my own twist on the instructor's main trick, which is to use
// `lift` to subscribe an instance of a Subscriber class to the source Observable

const yourNewPipeOperator = makeRxJSOperator(SubscriberClass)
// yourNewPipeOperator is now a function which takes an Observable and returns another Observable
// which you can use like this:
inputStream$
.pipe(yourNewPipeOperator, anotherPipeOperator)
.subscribe((x) => console.log("dataresult:", x))

SubscriberClass looks like this (fill in your blanks):

class SubscriberClass extends Subscriber<U> {

constructor(subscriber:Subscriber<T>) {
super(subscriber);
}

_next(value:<T>):void {
// this.destination.next(someVal) is how a value is passed to the next pipe operator
// it does not necessarily need to be in _next. It could, for example, be in
// the end of an asynch callback or Promise chain
this.destination.next!(modify(value));
// 'modify' here represents some hypothetical modification to 'value', if any
}

_complete(): void {
}

_error(err:Error):void {
}
}

Lindquist uses this construction to reproduce several of the canonical operators, mergeMap, switchMap and concatMap.

Practically, it can be used like this:

const audioPlayerHandler = makeRxJSOperator(AudioPlayer)
const recorderHandler = makeRxJSOperator(Recorder)
const blobToBase64 = makeRxJSOperator(BlobToBase64)
const speechToTextHander = makeRxJSOperator(SpeechToText)

// which are components of this interaction chain, rather like a Promise chain:
buttonClick$
.pipe(
audioRecordHandler,
audioPlayerHandler,
blobToBase64,
speechToTextHander
)
.subscribe((x) => console.log("dataresult:", x))

I have actually written the line above, with the Recorder Subscriber class reproduced here for example:

// This class accepts the state of a 'record' button
// through its _next method
// and then causes side effects depending on the toggle state
class Recorder extends Subscriber<any> {
private _stream: MediaStream = new MediaStream()
private _recorder: MediaRecorder = new MediaRecorder(new MediaStream())

constructor(subscriber: Subscriber<string>) {
super(subscriber)
}

_next(buttonState: string) {
switch (buttonState) {
case "RECORD":
this.start()
break
case "STOP":
this.stop()
break
}
}

private onDataAvailable = (event: BlobEvent) => {
const blob = new Blob([event.data], { type: "audio/ogg;codecs=opus" })
this.destination.next!(blob)
}

// If the user wants to 'RECORD', the microphone is accessed and begins
// recording audio. A new 'MediaRecorder' instance is created and a
// handler attached to its 'onDataAvailable' event
public start = () => {
navigator.mediaDevices
.getUserMedia({ audio: true })
.then((stream) => (this._stream = stream))
.then(
() =>
(this._recorder = new MediaRecorder(this._stream, {
audioBitsPerSecond: 12000,
}))
)
.then(() =>
this._recorder.addEventListener("dataavailable", this.onDataAvailable)
)
.then(() =>
console.log(
"mime:",
this._recorder.mimeType,
this._recorder.audioBitsPerSecond
)
)
.then(() => this._recorder.start())
}

// When the user presses 'STOP', the microphone is turned off and, through
// event handling (qv. onDataHandling), the raw audio data is passed
// to the next observer in the chain.
public stop = () => this._recorder.stop()
}

A lot of state is encapsulated and disposed of within this class. It is difficult to even imagine how one would accomplish this using the provided RxJS operators. For instance, the line navigator.mediaDevices.getUserMedia should never be called except through user interaction, otherwise the user sees an annoying popup immediately after landing on the page itself. Without a laborious workaround, this line will be called immediately when .subscribe is called on the entire chain.

But, all that said, I am still much of a newbie when it comes to RxJS, and so, perhaps, I would find all of this baroque or unnecessary were I more of an expert. I absolutely welcome instruction, if you know a better way.

Also, as an aside, Lindquist uses Quokka to show immediate feedback from his code changes, which is rather nifty.

Comments:

Leave a comment

There was an error. It's me, not you. You could try again later? Or try another method? Email? Smoke signals? Tell me what went wrong!

please enter a substantive message with no link or website address
please enter your name
please enter an email

Comment successfully sent, and it awaits moderation.