Bonaci_00  

In this article, excerpted from Spark in Action, we talk about RDD, the fundamental abstraction in Spark.

 

Resilient Distributed Dataset (RDD) is the fundamental abstraction in Spark. It represents a collection of elements that is:

  • immutable (read-only)
  • resilient (fault-tolerant)
  • distributed (dataset spread out to more than one node)

RDDs support a number of operations that do useful data manipulation, but they always yield a new RDD instance. Once created, they never change, thus the adjective immutable. Mutable state is known to introduce complexity, but besides that, having immutable collections allows Spark to provide important fault-tolerance guarantees in a straightforward manner.

The fact that the collection is distributed on a number of machines (execution contexts, JVMs) is transparent[1] to its users, so working with RDDs is not much different than working with ordinary local collections, like plain old lists, maps, sets, etc.

We can summarize that the purpose of RDDs is to facilitate parallel operations on large datasets in a straightforward manner, abstracting away their distributed nature and inherent fault tolerance.

RDDs are resilient because of the Spark’s built-in fault recovery mechanics. Spark is capable of healing RDDs in case of node failure. While other distributed computation frameworks facilitate fault-tolerance by replicating data to multiple machines (so it can be restored from healthy replicas once a node fails), RDDs are different, they provide fault tolerance by logging the transformations used to build a dataset (how it came to be) rather than the dataset itself. If a node fails, only a subset of the dataset that resided on the failed node needs to be recomputed.


Transformations and actions

There are two types of RDD operations, transformations and actions. Transformations (e.g., filter, map, union) are operations that produce a new RDD by performing some useful data manipulation on another RDD. The other type of RDD operations, actions (e.g., count or saveAsTextFile), trigger a computation in order to return a result to the calling program or write it to a stable storage.

It is important to understand that transformations are evaluated lazily, meaning that computation does not take place until you invoke an action. Once an action is triggered on an RDD, Spark examines RDD’s lineage and uses that information to build a “graph of operations” that needs to be executed in order to compute the action. You can think of transformations as a sort of diagram that tells Spark which operations need to happen and in which order once an action gets executed. This “graph of operations” is called Directed Acyclic Graph (DAG), after a special form of graph data structure that bears the same name. DAG is also discussed later in the book, since it is not important for grasping the following concepts.

Enough theory for now. To get a feel of RDD’s Scala API, let’s go through some basic RDD transformations and actions in your spark-shell.


Basic RDD actions and transformations

Even though you have probably already seen it in action, we will start with perhaps the most basic of transformations, filter. Arm yourself with your spark shell and follow along. Feel free to experiment, turn things around and make some mistakes. That way you will learn much more efficiently.


 filter

Featuring foreach and count.

Looking at the Spark API[2], you can see that the signature of the filter function (filter method of the RDD class, to be more precise) looks like this:

class RDD[T] {                           #A
   // ... other methods ...
  
  def filter(f: (T) => Boolean): RDD[T]  #B
 
  // ... other methods ...
 }

#A RDD is defined as a class with parameterized type T (we skipped the irrelevant parts)

#B filter takes another function as a parameter.


def is Scala’s function declaration keyword (same as val is for immutable variables and var is for mutable ones)

You can read the function signature like this: “Declare a function called filter that takes some other function as a parameter and returns an RDD. The RDD that is returned contains elements of the same type as the RDD on which the filter was called.

But of course the RDD that gets created must be of the same type of RDD on which filter is called. filter only drops some RDD’s elements and leave others unchanged (including their type). The remaining elements are used to construct a new RDD, which is then returned as the result of the filter method. The original RDD, the one on which the filter was called, remains unchanged.


Remember

filter (and all other transformations) is always invoked on some existing, live RDD instance, which we will refer to as this, or “calling RDD” from now on.


Another thing that the declaration says is that the function that you provide as an argument must itself take a single parameter of type T and return a Boolean. By using the same letter (T) to designate type, the API tells you that it must be of the same type as elements of the calling RDD (this). It sounds more complicated than it is, when in fact it makes perfect sense, since our fat arrow function’s whole purpose in life is to simply examine each of the calling RDD’s elements, one by one (that’s why its parameter is of the same type as RDD’s elements), and weigh in on each of them whether it stays in the newly created RDD or not (thus returns true or false).

For example, if you invoke filter on RDD of Ints, the fat arrow function must take a single Int (evaluate some arbitrary test on it) and return true or false. The resulting RDD will have elements of type Int (the ones for which the fat arrow function returned true).

Here is a simple example, just so you can see these things in context. We recommend that you follow in your REPL.

scala> val nums = sc.parallelize(1 to 10)                #A
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:21
  
 scala> val numsEven = nums.filter(num => num % 2 == 0)  #B
 numsEven: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:23
  
 scala> numsEven.foreach(x => println(x))                #C
 2
 4
 6
 8
 10

#A the parallelize method of the SparkContext creates a new RDD by distributing an existing Scala collection. The expression 1 to 10 returns the collection of Ints, 1, 2, 3, ... 9, 10

#B element is even if it is multiple of 2 (i.e. the expression element % 2 has no remainder). % stands for modulo[3]

#C foreach x  in numsEven collection call println(x)


Predicate function

In this context, our anonymous, fat arrow function is called a predicate. Let’s see what the dictionary says about the predicate noun:

    That which is affirmed or denied.

In functional programming, we define predicate as a function that returns true or false, depending on whether its argument has certain properties.

Yep, that is exactly what our fat arrow function does.

Functions are “first class citizens” in Scala, meaning that they can be stored in a variable, received by another function as a parameter or returned by another function.

Just as a side-note (which you don’t need to remember), we should mention that in functional programming in general, a function that takes another function as a parameter (or returns another function as a result) is called a higher order function.


Our fat arrow, predicate function is anonymous, but you might have as well first defined the equivalent named function, like this:

scala> def isEven(num: Int) = { num % 2 == 0 }
 isEven: (num: Int)Boolean

… or store (a reference to) the function definition in a variable, like this:

scala> val isEvenInVal = (num: Int) => num % 2 == 0
 isEvenInVal: Int => Boolean = <function1>

… and then used it in place of our predicate function:

scala> val numsEven1 = nums.filter(isEvenInVal)
 numsEven1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:25
  
 scala> numsEven1.count
 res3: Long = 5

So you’ve seen that filter is used to conditionally remove[4] some elements from an RDD.

 


[1] Well, almost transparent, in order to optimize computation and thus gain performance benefits, there are ways to control dataset partitioning (how is RDD distributed amongst nodes in a cluster) and persistence options. We will talk about both features extensively in a later part of the book.

[4] RDDs are immutable, remember? So when we say “remove” we actually mean “create new RDD where some elements are conditionally missing, compared to RDD that you started with, i.e. the one on which filter was called”.