From RxJS in Action by Paul P. Daniels and Luis Atencio

This article describes how RxJS components work and how thinking in streams can help you visualize their function.


Save 37% on RxJS in Action. Just enter code fccdaniels into the discount code box at checkout at manning.com.


When writing code in an object-oriented way, we are taught to decompose problems into components, interactions, and states. This breakdown occurs iteratively and on many levels, with each part being further sub-divided into more components, and lastly arriving at a set of cohesive classes that implement a well-defined set of interactions. Hence, in the object-oriented approach, classes are the main unit of work.

Every time a component is created, it will have a state associated with it, and the manipulation of that state in a structured fashion is what advances application logic. For example, a car that stores its position and velocity can update its position continuously to simulate movement. For another example, consider a typical banking application. Banking systems contain modules that encapsulate not only the business logic associated with withdrawing, depositing, and transferring money, but also domain models that store and manage other properties such as account and user profiles. It’s the manipulation of this state (its behavior) that causes the data to move from one state to the next. In other words, behavior is driven by the continuous mutation of a system’s state. The units are the classes responsible for modeling accounts, users, money, and others.

RxJS components work a bit differently. In RxJS and in reactive programming in general, the fundamental unit of work is the stream. Think in terms of streams (think reactively) and design code in a way that, instead of holding on to data, you allow it to flow through and apply transformations along the way until it reaches your desired state. Learn how to handle different types of data sources, whether static or dynamic, as RxJS streams using a consistent computational model based on the Observable data type. Unlike other JavaScript libraries, however, using RxJS in your application means much more than implementing new APIs; it means that you must approach your problems not as the sum of the set of states in classes, but as a sequence of data that continuously travels from the producers to consumers. This way of thinking places the notion of time at the forefront, which runs as the undercurrent through the components of an RxJS stream and causes data to be never stored but rather transiently flowing. Relating this to a real-world physical stream we often think of the data source as the top of the stream and the data consumer as the bottom of the stream. In this way, we can think of data as always traveling downstream, in a single direction like water in a river, and along the way you can have dams in charge of transforming the nature of this stream. Thinking this way will help you understand how data should move through an application.

This is not to say that this understanding will come easily, like any skill it must be built up over time and through iterative application of the concepts. The notion of data in motion is a difficult one for most people to wrap their head around. To begin building this foundation, this article begins by laying down the groundwork in order to better understand streams. Many of the basic principles behind reactive programming derive from functional programming, so let’s start there.


Functional programming as the pillar of reactive programming

Functional programming is the foundation on top of which the abstractions that support reactive programming are built. If you have a background in functional programming, you are free to skip this section; however, we recommend you read along because it will help you understand some of the design decisions behind RxJS.

We ask you to take another quick glance at the main website for the Reactive Extensions project. In it, you will find the following definition:

[RxJS] is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming


Functional programming

Functional programming is a software paradigm that emphasizes the use of functions to create programs that are declarative, immutable, and free of side effects. Did you say immutable? We agree with you: The notion of a program that doesn’t ever change state is a bit mind bending. After all, that’s why we put data in variables and modify them to our heart’s content.

All of the object-oriented or procedural application code you’ve written so far relies on changing and passing variables back and forth to solve problems. So, how can we accomplish the same goals without this? Take the example of a clock. When a clock goes from 1:00 pm to 2:00 pm, it’s undoubtedly changing, isn’t it? To frame this from a functional point of view, we argue that instead of the clock mutating every second, it’s best to return new clock instances every second. Theoretically, both would arrive at the same time.

RxJS borrows lots of principles from FP, particularly in terms of function chaining and lazy evaluation, which are the two design decisions that drive the development of RxJS stream programming. Before we dive in, we’ll explain the main parts of the FP definition we just gave, and then show you a quick example involving arrays.

Functional programs are:

  1. Declarative: Functional code has a very peculiar style, which takes advantage of JavaScript’s higher-order functions to apply specialized business logic. Function chains (also known as pipelines) describe data transformation steps in a very idiomatic manner. Most people see SQL syntax as a perfect example of declarative code.
  2. Immutable: An immutable program (and by this we mean any immutable function, module, or whole program)  is one that never changes or modifies data after it’s been created, or after its variables have been declared. This can be a very radical concept to grasp, especially coming from an object-oriented background. Functional programs treat data as immutable, constant values. A good example of a familiar module is the String type, as none of the operations change the string they operate; rather, they all return new strings.
  3. Side effect free: functions with side effects depend on data residing outside of its own local scope. A function’s scope is made up its arguments and any local variables declared within. Interacting with anything outside of this like: reading a file, writing to the console, rendering elements on an HTML page, and others, are considered side effects and should be avoided or, at the very least, isolated.

In general, mutations and side effects make functions unreliable and unpredictable. That is to say, if a function alters the contents of an object, it will compromise other functions that expect this object to keep its original state. Figure 1 shows you the dependency between two functions doWork() and doMoreWork() through shared data.


Atencio_RF_TiS_01

Figure 1 Function doWork() is temporally coupled to doMoreWork() due to the dependency on shared state (side effect). Hence, doWork() must called before doMoreWork() or the program ceases to work.


This coupling presents an issue because doMoreWork now relies doWork to run first, so testing can’t be done in isolation, as it should be. The example in figure 1 is a very obvious side effect, but they’re not all that clean-cut. Consider this trivial function to compute the lowest value in an array:

function lowest(arr) {    return arr.sort(arr)[0]; }

While this code seems harmless to you, it hides a terrible side effect. Can you spot why? This function actually changes the passed in array object reference:

var source = [3,1,9,8,3,7,4,6,5]; var result = lowest(source); //-> 1    console.log(source); //-> [1, 3, 3, 4, 5, 6, 7, 8, 9] ❶

❶ The original array changed!

Matters get worse if this were a concurrent application where data structures are shared amongst different components. If these functions were to modify data in separate threads, you would need robust synchronization mechanisms to ensure they execute and mutate this state in the right order, or you’ll experience very random and unpredictable behavior, as you in the simple function before.

Fortunately, JavaScript is multithreaded so don’t need to worry about shared state running through different threads. But we deal quite often with concurrent code when working with web workers or simply in making remote HTTP calls. Functions with side effects are susceptible to order of execution and can’t be evaluated in any arbitrary order, and this applies also to asynchronous functions that read or change external variables at unpredictable times. Consider this trivial, yet frequent, use case illustrated in figure 2, which involves asynchronous code mixed with synchronous code. This presents a tremendous challenge because the latter assumes that functions executing before it have completed successfully, which might not necessarily be the case if there’s some latency:


Atencio_RF_TiS_02

Figure 2 Function doAsyncWork() can an example of a remote call that fetches data from the server. Suppose this call has a latency around 1 second, depending on network conditions. Immediately after, the next function runs doMoreWork() expecting that a piece of shared data had already been initialized. Due to this latency, the shared data had not been initialized and the execution of doMoreWork() is compromised.


In this case, doAsyncWork() fetches some data from the server, which is never a constant amount of time. So, doMoreWork() fails to run properly as it reads data that hadn’t yet been initialized. Of course, you can try to implement your own timeouts in order to anticipate latency, but dealing directly with time yourself is a recipe for disaster; your code will be extremely brittle, hard to maintain, and will cause to you to come in to work during a weekend where your application is experiencing slightly more traffic than usual. Working with data immutably, using functional programming with a library like RxJS can make these timing issues disappear.

Even though JavaScript is not a pure functional language, with a bit of discipline and the help of the property libraries, you can use it completely functionally. Nevertheless, there is some native support in the language for some small such as using the const keyword to lock the value referenced by a variable, or perhaps processing collections of objects with the Array methods map, reduce, filter, and others in a very idiomatic way. Listing 2 shows a simple program that takes an array of numbers, extracts the even numbers, computes their squares, and sums their total.


Listing 2 Processing collections with map, reduce, and filter

var isEven = num => num % 2 === 0; var square = num => num * num; var add = (a, b) => a + b;   var arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];   arr.filter(isEven).map(square).reduce(add);  //-> 220

If you imagine for a second having to write this program using a non-functional or imperative approach, you’ll probably need to write a loop, a conditional statement, and a few variables keeping track of things. Functional programming, on the other hand, raises the level of abstraction and encourages a style of declarative coding that clearly states the purpose of a program, describing what it does and not how it does it. Nowhere in this short program is the presence of a loop, if/else, or any imperative control flow mechanism.

In fact, one of the main themes in FP which you’ll use as well in reactive is that you’ll learn to program without loops. In listing 2, we took advantage of map, reduce, and filter to abstract all of this from you; these are known as higher-order functions. These functions are also immutable, which means that new arrays are created at each step of the way, keeping the original intact. Because loops cause mutations to occur by means of updating a loop counter, FP also employs recursion as the main mechanism to traverse through any type of data structure in a side effect free manner. The other reason for eliminating loops is that loops are designed to work well with synchronous data. Once you start fetching data asynchronously, loops begin to break down and you’re going to have to resort to creating artificial closures in your code and make sure the order of operations is correct.

In the example above, because these operations are side-effect-free, this program will always produce the same value (220) given the same input array. Side-effect-free functions are also known as pure, and the main idea behind this is that they are predictable when working on collections of objects or streams. You should always strive for purity whenever possible as it makes your programs easy to test and reason about.

Want to learn more about functional programming?

JavaScript’s Array object has a special place in functional programming because it behaves as an extremely powerful data type called a functor. In a simple sense, functors are containers that can wrap data and expose a mapping method that allows you to immutably apply transformations on this data, as seen by the Array.map() method. RxJS streams follow this same functor-like design.

Functional programming is a huge subject to cover. So, for more information, you can read Functional Programming in JavaScript (Manning 2016) by Luis Atencio.

The code shown in listing 2 that works very well with arrays, also translates to streams. Continuing with our pseudo Stream data type, consider now how we can process a stream of numbers:

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])   .filter(isEven)   .map(square)   .reduce(add)   .subscribe(console.log);  //-> 220

You can clearly see here how Rx was inspired by functional programming. All we had to do was to wrap the array into a stream, and then subscribe to it to listen for the computed values. This is the same as saying that streams are containers that you can use to lift data or events into them, so that you can apply sequences of operations until reaching your desired outcome. Fortunately, you’re very familiar with this concept already by working with arrays. I can lift a value onto an array and map any functions onto it. Suppose I declare some simple functions on strings like toUpper, slice, and repeat:

['rxjs'].map(toUpper).map(slice(0, 2)).map(repeat(2)); //-> 'RXRX'

The ancient Greek philosopher Heraclitus once said, “You can never step into the same river twice.” He formulated this statement as part of his doctrine on change and motion being central components of the universe — everything is constantly in motion. This epic realization is what RxJS streams are all about. Despite being dynamic, streams are actually immutable data types. Once a stream is declared to wrap an array, listen for mouse clicks, or respond to an HTTP call, you can’t mutate it or add a new value to it afterwards — you must do it at the time of declaration. Hence, we’re specifying the dynamic behavior of an object or value declaratively and immutably.

Moreover, the business logic of this program is pure and takes advantage of side-effect-free functions that are mapped onto the stream to transform the produced data into the desired outcome. The advantage of this is that all side effects are isolated and pushed onto the consumers (logging to the console in this case). This separation of concerns is ideal and keeps our business logic clean and pure. A typical RxJS program has the logical components shown in figure 3:


Atencio_RF_TiS_03

Figure 3 Events emitted by producers are pushed through a pipeline of side-effect-free functions, which implement the business logic of you program. This data flows to all observers in charge of consuming and displaying it.


Another design principle of streams that’s borrowed from functional programing is lazy evaluation. In this example below, the idea is that streams sit idle until a subscriber (a consumer) is attached to it, only then will it emit the values 1 – 10.

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])   .filter(isEven)   .map(square)   .reduce(add); ❶

❶ Nothing actually runs here until a subscriber is added

When a subscriber begins listening, the stream will emit events downstream through the pipeline from the producer to the consumer. This is beneficial in the event that your functions have side effects because the pipeline runs in a single direction, helping you to ensure the synchronous order of your function calls.

Functional programs are also lazily evaluated. Lazy evaluation simply means that code is never called until actually needed. In other words, functions won’t evaluate until their results are used as part of some other expression. This is actually a mandatory requirement for streams because they emit data infinitely to handle mouse movement, key presses, etc. Otherwise, buffering the entire sequence of mouse movements in memory could make your programs crash. Without lazy evaluation, code like this will cause the application to run out of memory. We’ll show some use cases of infinite streams:

//1 Stream.range(1, Number.POSITIVE_INFINITY)   .take(100)   .subscribe(console.log);   //2 Stream.fromEvent('clicks')    .map(e => [e.clientX, e.clientY])    .subscribe(console.log);

In example 1 above, lazy evaluation makes the stream smart enough to understand that it will never need to actually run through all the numbers infinitely before taking the first 100. And even if the number to take is big, streams will not persistently hold on to data; instead any data emitted is immediately broadcasted out to all subscribers at the moment it gets generated. In example 2, imagine if we needed to store all of a user’s clicks in memory; this could potentially take up a huge amount. Instead of holding to this data, RxJS lets it flow freely and uses the Iterator pattern to traverse any type of data source irrespective of how it gets emitted.


If you want to learn more about the book, check it out on liveBook here.