Chandan Rajpurohit

An Artist With Technical Skills

When you start working with Spark Core RDD is an important topic you stumble upon. What is RDD? What is the need of RDD? etc. are some of the questions we come across. 

What is RDD?

RDD stands for Resilient Distributed Datasets.They can be simply treated as blocks of data.

 RDD is similar to Directory in file system, Relations in Pig, Tables in Databases and Hive, and Blocks in HDFS.

Features of RDD:

Any spark system can work on an RDD generated by another system which allows us to colocate processing code.

Data in Spark is represented using RDDs(resilient distributed datasets), which are an abstraction over a collection of items.

RDD is distributed over clusters so that each cluster node will store and manage certain items in an RDD.

RDD can be created from many sources like regular Scala or from data from HDFS.

RDDs can be in memory,on disk or on both.

Why do we need RDD?

Spark works over HDFS and does not store data in Spark itself then why do I need a RDD? Why indeed?

Spark came into picture because of limitations in MapReduce.

Limitations of MapReduce are :

To write MapReduce we need knowledge of Java.

Number of Lines in MapReduce for a simple job range from 40+ lines.

Time to perform a job in MapReduce is comparatively more.

More number of lines also makes room for bugs and debugging a little harder.

Spark uses RAM/in-memory for processing of data. Therefore time to perform jobs is comparatively low.

It eliminates the need for mapper and reducer class thus less number of lines.

Can be done using Scala, Java and Python.

To manage between HDFS and in-memory data Spark uses RDD.

So, we need RDD to generate and perform processing using data in HDFS/Clusters in RAM.

How to perform joins in Spark core?

Join is used when we need to analyze data from multiple tables/files.

Joins are an important part of data processing and analyzing.

To perform joins in Spark core we can use the join method.

 However, Spark can work with structured, semistructured and  unstructured data. We can not directly join in Spark.

Steps to perform Join :

  1. Load the data

val rdd_name1 = sc.textFile(โ€œFile1โ€)

Here, sc is spark context which is used to read and write data form files.

  1. Create case class

case class class_name1(attributes:data type)

Case class is used to define schema for RDD. 

  1. Map the RDD with case class

val rdd_name2 = rdd.name1.map(_.split(โ€œdelimiterโ€)).map(r=>(common column, case class object))

Here, _ is a space holder and represents tuples/lines in a file which is delimited using a split method. Common column is value which is common for both columns and case class object maps columns with case class class

  1. Join 

val joined_rdd = rdd_name2.join(other_rdd)

  1. display

joined_rdd.collect.foreach(println)

Example:

Consider Student.txt

1,Alex,21,1234567890,Hyd,89,A

2,Sachin,22,9876543210,Kol,78,B

3,Raju,22,9898767654,Del,90,C

4,Christa,21,7689987654,Pun,93,A

5,Pam,23,2345678990,Bhu,75,A

6,Angela,23,1212121212,Che,87,B

7,Kamla,24,9434345256,Tri,83,C 

8,Kumal,24,1223344556,Che,72,C

And Class.txt

A,Arun

B,Amrinder

C,Alex

Schema for student is (Stud_id,Stud_name,Age,Mobile,City,Percentage,Class)

And that for class is (class, class_teacher)

Problem :

Find class teacher of each student

Solution

val student = sc.textFile(โ€œstudent.txtโ€)

val class = sc.textFile(โ€œclass.txtโ€)

case class students(Stud_id:Int,Stud_name:String,Age:Int,Mobile:Long,City:String,Percentage:Int,Class:String)

case class class_sc(class:String,class_teacher:String)

val student1 = student.map(_.split(โ€œ,โ€)).map(r=>(r(6),students(r(0).toInt,r(1),r(2).toInt,r(3).toLong,r(4),r(5).toInt,r(6))))

val class1 = class.map(_.split(โ€œ,โ€)).map(r=>(r(0),class_sc(r(0),r(1))))

val student_join = student1.join(class1)

student_join.collect.foreach(println)

Output:

A,(1,Alex,21,1234567890,Hyd,89,A),(A,Arun)

B,(2,Sachin,22,9876543210,Kol,78,B),(B,Amrinder)

C,(3,Raju,22,9898767654,Del,90,C),(C,Alex)

A,(4,Christa,21,7689987654,Pun,93,A),(A,Arun)

A,(5,Pam,23,2345678990,Bhu,75,A),(A,Arun)

B,(6,Angela,23,1212121212,Che,87,B),(B,Amrinder)

C,(7,Kamla,24,9434345256,Tri,83,C),(C,Alex)

C,(8,Kumal,24,1223344556,Che,72,C),(C,Alex)

To fetch specific columns we can map Joined_RDD

val  Required_column = student_join.map(x=>(x._2._1.Stud_id,x._1,x._2._1.Stud_name,x._2._1.class_teacher))

Required_column.collect.foreach(println)

Output:

(1,A,Alex,Arun)

(2,B,Sachin,Amrinder)

(3,C,Raju,Alex)

(4,A,Christa,Arun)

(5,A,Pam,Arun)

(6,B,Angela,Amrinder)

(7,C,Kamla,Alex)

(8,C,Kumal,Alex)

For filter condition in joined RDD we can use filter function

val filter_student = student_join.filter(x=> x._2._1.Percentage >= 90)

val  Required_column = student_join.map(x=>(x._2._1.Stud_id,x._1,x._2._1.Stud_name,x._2._1.class_teacher))

Required_column.collect.foreach(println)

Output:

(3,C,Raju,Alex)

(4,A,Christa,Arun)

Spark is major framework for data analysis. It has support for Machine Learning (MLlib), Streaming data(Streaming), and Graph based data(GraphX).

Time taken to perform an operation is also comparatively less.

Happy Learning!


Leave a Reply

%d bloggers like this: