When you start working with Spark Core RDD is an important topic you stumble upon. What is RDD? What is the need of RDD? etc. are some of the questions we come across.
What is RDD?
RDD stands for Resilient Distributed Datasets.They can be simply treated as blocks of data.
RDD is similar to Directory in file system, Relations in Pig, Tables in Databases and Hive, and Blocks in HDFS.
Features of RDD:
Any spark system can work on an RDD generated by another system which allows us to colocate processing code.
Data in Spark is represented using RDDs(resilient distributed datasets), which are an abstraction over a collection of items.
RDD is distributed over clusters so that each cluster node will store and manage certain items in an RDD.
RDD can be created from many sources like regular Scala or from data from HDFS.
RDDs can be in memory,on disk or on both.
Why do we need RDD?
Spark works over HDFS and does not store data in Spark itself then why do I need a RDD? Why indeed?
Spark came into picture because of limitations in MapReduce.
Limitations of MapReduce are :
To write MapReduce we need knowledge of Java.
Number of Lines in MapReduce for a simple job range from 40+ lines.
Time to perform a job in MapReduce is comparatively more.
More number of lines also makes room for bugs and debugging a little harder.
Spark uses RAM/in-memory for processing of data. Therefore time to perform jobs is comparatively low.
It eliminates the need for mapper and reducer class thus less number of lines.
Can be done using Scala, Java and Python.
To manage between HDFS and in-memory data Spark uses RDD.
So, we need RDD to generate and perform processing using data in HDFS/Clusters in RAM.
How to perform joins in Spark core?
Join is used when we need to analyze data from multiple tables/files.
Joins are an important part of data processing and analyzing.
To perform joins in Spark core we can use the join method.
However, Spark can work with structured, semistructured and unstructured data. We can not directly join in Spark.
Steps to perform Join :
- Load the data
val rdd_name1 = sc.textFile(โFile1โ)
Here, sc is spark context which is used to read and write data form files.
- Create case class
case class class_name1(attributes:data type)
Case class is used to define schema for RDD.
- Map the RDD with case class
val rdd_name2 = rdd.name1.map(_.split(โdelimiterโ)).map(r=>(common column, case class object))
Here, _ is a space holder and represents tuples/lines in a file which is delimited using a split method. Common column is value which is common for both columns and case class object maps columns with case class class
- Join
val joined_rdd = rdd_name2.join(other_rdd)
- display
joined_rdd.collect.foreach(println)
Example:
Consider Student.txt
1,Alex,21,1234567890,Hyd,89,A
2,Sachin,22,9876543210,Kol,78,B
3,Raju,22,9898767654,Del,90,C
4,Christa,21,7689987654,Pun,93,A
5,Pam,23,2345678990,Bhu,75,A
6,Angela,23,1212121212,Che,87,B
7,Kamla,24,9434345256,Tri,83,C
8,Kumal,24,1223344556,Che,72,C
And Class.txt
A,Arun
B,Amrinder
C,Alex
Schema for student is (Stud_id,Stud_name,Age,Mobile,City,Percentage,Class)
And that for class is (class, class_teacher)
Problem :
Find class teacher of each student
Solution
val student = sc.textFile(โstudent.txtโ)
val class = sc.textFile(โclass.txtโ)
case class students(Stud_id:Int,Stud_name:String,Age:Int,Mobile:Long,City:String,Percentage:Int,Class:String)
case class class_sc(class:String,class_teacher:String)
val student1 = student.map(_.split(โ,โ)).map(r=>(r(6),students(r(0).toInt,r(1),r(2).toInt,r(3).toLong,r(4),r(5).toInt,r(6))))
val class1 = class.map(_.split(โ,โ)).map(r=>(r(0),class_sc(r(0),r(1))))
val student_join = student1.join(class1)
student_join.collect.foreach(println)
Output:
A,(1,Alex,21,1234567890,Hyd,89,A),(A,Arun)
B,(2,Sachin,22,9876543210,Kol,78,B),(B,Amrinder)
C,(3,Raju,22,9898767654,Del,90,C),(C,Alex)
A,(4,Christa,21,7689987654,Pun,93,A),(A,Arun)
A,(5,Pam,23,2345678990,Bhu,75,A),(A,Arun)
B,(6,Angela,23,1212121212,Che,87,B),(B,Amrinder)
C,(7,Kamla,24,9434345256,Tri,83,C),(C,Alex)
C,(8,Kumal,24,1223344556,Che,72,C),(C,Alex)
To fetch specific columns we can map Joined_RDD
val Required_column = student_join.map(x=>(x._2._1.Stud_id,x._1,x._2._1.Stud_name,x._2._1.class_teacher))
Required_column.collect.foreach(println)
Output:
(1,A,Alex,Arun)
(2,B,Sachin,Amrinder)
(3,C,Raju,Alex)
(4,A,Christa,Arun)
(5,A,Pam,Arun)
(6,B,Angela,Amrinder)
(7,C,Kamla,Alex)
(8,C,Kumal,Alex)
For filter condition in joined RDD we can use filter function
val filter_student = student_join.filter(x=> x._2._1.Percentage >= 90)
val Required_column = student_join.map(x=>(x._2._1.Stud_id,x._1,x._2._1.Stud_name,x._2._1.class_teacher))
Required_column.collect.foreach(println)
Output:
(3,C,Raju,Alex)
(4,A,Christa,Arun)
Spark is major framework for data analysis. It has support for Machine Learning (MLlib), Streaming data(Streaming), and Graph based data(GraphX).
Time taken to perform an operation is also comparatively less.
Happy Learning!