From Spark with Java by Jean Georges Perrin
You’ve probably seen a simple use-case where Spark ingests data from a CSV file, then performs a simple operation, and then stores the result in the database. In this article, you’re going to see what happened behind the scenes.
Walking through your application
This article explores how Spark works behind the scenes. To start, we’ll look a little more closely at the first typical operation: the connection to the master. After this non-functional step, let’s walk through the ingestion, the transformation, and, finally, the publishing of the data in the RDBMS.
Connecting to a master
For every Spark application, the first operation is to connect to the Spark master and get a Spark session. This is an operation you’ll do every time. This is illustrated by the code fragment of Listing 1 and Figure 1.
In this context, you’re connecting to Spark in local mode.
Listing 1 Getting a Spark session
SparkSession spark = SparkSession.builder() .appName("CSV to DB") .master("local") .getOrCreate();
Method chaining makes Java more compact.
In the recent years, more and more Java APIs use method chaining as in SparkSession.builder().appName(…).master(…).getOrCreate(). You may have seen situations where more intermediate objects were created, a bit like:
Object1 o1 = new Object1(); Object2 o2 = o1.getObject2(); o2.set("something");
Spark’s API uses a lot of method chaining.
Method chaining makes your code more compact, more readable. One major drawback is debugging: imagine a Null Pointer Exception (NPE) in the middle of your chain and you have to spend more time debugging it.
Local mode isn’t a cluster but it’s much easier
In order for you to be able to run the example in this article without setting up a full cluster, I specified
local as the value for the master because you’re running Spark in local mode.
If you had a cluster, you’d give the address of the cluster instead.
For the sake of building your mental model, assume you have a cluster instead of the local mode.
All the illustrations in this article represent a timeline. At t0, you start your application (your
main() function) and at t1, you get your session.
Figure 1 The application (or driver) connects to the master and gets a Spark session. The arrow indicates the flow of sequence: at t0, you start your application and at t1, you get your Spark session.
This first step is always to connect to a master. You can now ask Spark to load the CSV file.
Loading or ingesting the CSV file
Loading, ingesting, reading are synonyms for what you’re going to do now: ask Spark to load the data contained in the CSV file. Spark can use distributed ingestion through the different nodes of the cluster.
As you can imagine, as with all good masters, it doesn’t do much, it relies on slaves or workers. You’ll find both wordings in the Spark documentation; I suggest you pick one, depending on your level of political correctness. I’ll overcome my inherently French indecisiveness and pick one for this article: worker.
In our scenario, you have three workers. Distributed ingestion means you’ll ask our three workers to ingest at the same time.
Figure 2 The master knows its workers (or slaves). In this illustration, you have three workers. It’s a logical representation: any worker could be on the same physical node as the master. Each worker has memory, which it uses via partitions.
At t2, the master tells the workers to load the file, as coded in listing 2. Normal questions would be if you have three workers, which one is loading the file? Or, if they load simultaneously, how do they know where to start and finish? Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc.
In this context, a partition is a dedicated area in the worker’s memory.
Listing 2 Reading the authors file
Dataset<Row> df = spark.read() .format("csv") .option("header", "true") .load("data/authors.csv");
Let’s take a second to look at our CSV file (see listing 3). It’s a simple file with two columns:
lname for last name and
fname for the first name. The first line of the file is a header. It contains six more lines, which become six rows in our dataframe.
Listing 3 A good ol’ CSV file
lname,fname Pascal,Blaise Voltaire,François Perrin,Jean Georges Maréchal,Pierre Sylvain Karau,Holden Zaharia,Matei
The workers create tasks to read the file. Each worker has access to the node’s memory and assigns a memory partition to the task.
Figure 3. Tasks are being created based on the available resources. The worker may create several tasks and assign a memory partition to the task. The solid tasks are running (they also have a green dot), in contrast with non-working tasks (from other applications for example), which are hollow and have a red dot.
At t4, each task continues by reading a part of the CSV file as illustrated in figure 4. As the task is ingesting its rows, it stores them in a dedicated partition.
Why should you care about partitions and their location?
When your operations are simple, like concatenating two fields into a third one, Spark is fast.
Spark can join data between datasets and can perform aggregate, exactly as you do those operations with your relational database. Now imagine that you’re joining data in the first partition of worker #1 with the second partition of worker #2: all that data must be transferred, which is a costly operation.
Figure 4 shows the record being copied from the CSV file to the partition during the ingestion process, within the R ➧ P (Record to Partition) box. The memory box, in purple, shows which records are in which partition on each partition. In this example, record #1, which contains
Blaise Pascal, is in the first partition of the first worker.
Figure 4. The ingestion is taking place, each task is now loading some records into its own memory partition, as suggested by the R ➧ P (Record to Partition) box. You can also see the purple box which contains the records after the ingestion has been done.
Transforming your data
After the data has been loaded, at t5, you can process the records. Your operation is fairly simple: add a new column to the dataframe, called
name. The full name (column
name) is a concatenation of the last name (column
lname), a comma, a space, and the first name (in
fname). For example,
Jean Georges (first name) and
Perrin (last name) becomes
Perrin, Jean Georges. Listing 4 is describing the process and figure 6 illustrates the process.
As Seth Rogen said: I am lazy, but for some reason, I am so paranoid that I end up working hard. This is how Spark acts. At this time, you told Spark to concatenate the fields, but it didn’t do anything.
Spark is lazy: it only works when asked. Spark stacks up all your requests and, when it needs to, it optimizes the operations and does the hard work. Like Seth, when you ask nicely, Spark works hard.
In this situation, you’re using the
withColumn() method, which is a transformation. Spark only starts processing when he sees an action, such as the
write() method in listing 5.
Listing 4 Adding a column to our dataframe
df = df.withColumn( "name", concat(df.col("lname"), lit(", "), df.col("fname")));
Figure 5 At t5, Spark adds the transformation step in your flow. Each task continues to perform its work, getting all first and last names from the memory partition and creates the new name.
You’re now ready for the last operation: saving the result in the database.
Saving the work done in our dataframe to a database
It’s about time to save your result in the database, after ingestion of the CSV file and transforming the data in the dataframe. The code responsible for this operation is in listing 5 and illustrated in figure 6.
Listing 5 Saving the data to the database
String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs"; Properties prop = new Properties(); prop.setProperty("driver", "org.postgresql.Driver"); prop.setProperty("user", "jgp"); prop.setProperty("password", "Spark<3Java"); df.write() .mode(SaveMode.Overwrite) .jdbc(dbConnectionUrl, "ch02", prop);
As you’re certainly familiar with JDBC (Java Database Connectivity), you’ve probably noticed that Spark expects similar information:
- a JDBC connection URL
- the name of a driver
- a user
- and a password
write() method returns a
DataFrameWriter object on which you can chain a
mode() method to specify how to write: here, you’ll overwrite the data in the table.
Figure 6. Adding the save operation to our workflow, as you copy the data in the partition (P) to the database (D) at t6, as suggested by the P ➧ D box. Each task opens a connection to the database.
Figure 7 represents your application’s full mental model. It’s important to remember:
- The whole dataset never hit our application (driver): the dataset was split between the partitions on the workers, not on the driver.
- The entire processing took place in the workers.
- The workers saved the data in their partition to the database. In this scenario, you had four partitions; this means four connections to the database when you saved the data. Imagine a similar scenario with 200k tasks trying first to connect to the database and then inserting data. A fine-tuned database server refuses too many connections, which requires more control in the application. A solution to this load issue involves repartitioning and the options allowed when exporting to a database.
Figure 7. Our complete mental model describing Spark’s behavior when ingesting a CSV file, transforming its data, and then saving the data to a database. The diagram also illustrates the usage of memory in each worker and the attribution of records in those partition. The P ➧ D symbol indicates that you’re loading records in the partition and the P ➧ D symbol indicates that you’re copying the data in the partition to the database. Finally, the timeline, at t7, returns to the application: no data has ever been transferred from the worker to the application.
And that’s where we will stop for now.