|
From Spark with Java by Jean Georges Perrin You’ve probably seen a simple use-case where Spark ingests data from a CSV file, then performs a simple operation, and then stores the result in the database. In this article, you’re going to see what happened behind the scenes. |
Save 37% off Spark with Java. Just enter code fccperrin into the discount code box at checkout at manning.com.
Walking through your application
This article explores how Spark works behind the scenes. To start, we’ll look a little more closely at the first typical operation: the connection to the master. After this non-functional step, let’s walk through the ingestion, the transformation, and, finally, the publishing of the data in the RDBMS.
Connecting to a master
For every Spark application, the first operation is to connect to the Spark master and get a Spark session. This is an operation you’ll do every time. This is illustrated by the code fragment of Listing 1 and Figure 1.
In this context, you’re connecting to Spark in local mode.
Listing 1 Getting a Spark session
SparkSession spark = SparkSession.builder() .appName("CSV to DB") .master("local") .getOrCreate();
Object1 o1 = new Object1(); Object2 o2 = o1.getObject2(); o2.set("something");
All the illustrations in this article represent a timeline. At t0, you start your application (your main()
function) and at t1, you get your session.
Figure 1 The application (or driver) connects to the master and gets a Spark session. The arrow indicates the flow of sequence: at t0, you start your application and at t1, you get your Spark session.
This first step is always to connect to a master. You can now ask Spark to load the CSV file.
Loading or ingesting the CSV file
Loading, ingesting, reading are synonyms for what you’re going to do now: ask Spark to load the data contained in the CSV file. Spark can use distributed ingestion through the different nodes of the cluster.
As you can imagine, as with all good masters, it doesn’t do much, it relies on slaves or workers. You’ll find both wordings in the Spark documentation; I suggest you pick one, depending on your level of political correctness. I’ll overcome my inherently French indecisiveness and pick one for this article: worker.
In our scenario, you have three workers. Distributed ingestion means you’ll ask our three workers to ingest at the same time.
Figure 2 The master knows its workers (or slaves). In this illustration, you have three workers. It’s a logical representation: any worker could be on the same physical node as the master. Each worker has memory, which it uses via partitions.
At t2, the master tells the workers to load the file, as coded in listing 2. Normal questions would be if you have three workers, which one is loading the file? Or, if they load simultaneously, how do they know where to start and finish? Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc.
In this context, a partition is a dedicated area in the worker’s memory.
Listing 2 Reading the authors file
Dataset<Row> df = spark.read() .format("csv") .option("header", "true") .load("data/authors.csv");
Let’s take a second to look at our CSV file (see listing 3). It’s a simple file with two columns: lname
for last name and fname
for the first name. The first line of the file is a header. It contains six more lines, which become six rows in our dataframe.
Listing 3 A good ol’ CSV file
lname,fname Pascal,Blaise Voltaire,François Perrin,Jean Georges Maréchal,Pierre Sylvain Karau,Holden Zaharia,Matei
The workers create tasks to read the file. Each worker has access to the node’s memory and assigns a memory partition to the task.
Figure 3. Tasks are being created based on the available resources. The worker may create several tasks and assign a memory partition to the task. The solid tasks are running (they also have a green dot), in contrast with non-working tasks (from other applications for example), which are hollow and have a red dot.
At t4, each task continues by reading a part of the CSV file as illustrated in figure 4. As the task is ingesting its rows, it stores them in a dedicated partition.
Figure 4 shows the record being copied from the CSV file to the partition during the ingestion process, within the R ➧ P (Record to Partition) box. The memory box, in purple, shows which records are in which partition on each partition. In this example, record #1, which contains Blaise Pascal
, is in the first partition of the first worker.
Figure 4. The ingestion is taking place, each task is now loading some records into its own memory partition, as suggested by the R ➧ P (Record to Partition) box. You can also see the purple box which contains the records after the ingestion has been done.
Transforming your data
After the data has been loaded, at t5, you can process the records. Your operation is fairly simple: add a new column to the dataframe, called name
. The full name (column name
) is a concatenation of the last name (column lname
), a comma, a space, and the first name (in fname
). For example, Jean Georges
(first name) and Perrin
(last name) becomes Perrin, Jean Georges
. Listing 4 is describing the process and figure 6 illustrates the process.
Listing 4 Adding a column to our dataframe
df = df.withColumn( "name", concat(df.col("lname"), lit(", "), df.col("fname")));
Figure 5 At t5, Spark adds the transformation step in your flow. Each task continues to perform its work, getting all first and last names from the memory partition and creates the new name.
You’re now ready for the last operation: saving the result in the database.
Saving the work done in our dataframe to a database
It’s about time to save your result in the database, after ingestion of the CSV file and transforming the data in the dataframe. The code responsible for this operation is in listing 5 and illustrated in figure 6.
Listing 5 Saving the data to the database
String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs"; Properties prop = new Properties(); prop.setProperty("driver", "org.postgresql.Driver"); prop.setProperty("user", "jgp"); prop.setProperty("password", "Spark<3Java"); df.write() .mode(SaveMode.Overwrite) .jdbc(dbConnectionUrl, "ch02", prop);
As you’re certainly familiar with JDBC (Java Database Connectivity), you’ve probably noticed that Spark expects similar information:
- a JDBC connection URL
- the name of a driver
- a user
- and a password
The write()
method returns a DataFrameWriter
object on which you can chain a mode()
method to specify how to write: here, you’ll overwrite the data in the table.
Figure 6. Adding the save operation to our workflow, as you copy the data in the partition (P) to the database (D) at t6, as suggested by the P ➧ D box. Each task opens a connection to the database.
Figure 7 represents your application’s full mental model. It’s important to remember:
- The whole dataset never hit our application (driver): the dataset was split between the partitions on the workers, not on the driver.
- The entire processing took place in the workers.
- The workers saved the data in their partition to the database. In this scenario, you had four partitions; this means four connections to the database when you saved the data. Imagine a similar scenario with 200k tasks trying first to connect to the database and then inserting data. A fine-tuned database server refuses too many connections, which requires more control in the application. A solution to this load issue involves repartitioning and the options allowed when exporting to a database.
Figure 7. Our complete mental model describing Spark’s behavior when ingesting a CSV file, transforming its data, and then saving the data to a database. The diagram also illustrates the usage of memory in each worker and the attribution of records in those partition. The P ➧ D symbol indicates that you’re loading records in the partition and the P ➧ D symbol indicates that you’re copying the data in the partition to the database. Finally, the timeline, at t7, returns to the application: no data has ever been transferred from the worker to the application.
And that’s where we will stop for now.
If you want to learn more about the book, check it out on liveBook here and see this slide deck.
[1] Canadian–American comedian and filmmaker, known for “The Interview” and being Steve Wozniak in the “Steve Jobs” 2015 documentary, see more at http://www.imdb.com/name/nm0736622/.