Apache Spark

PySpark RDD – Transformations

In Python, PySpark is a Spark module used to provide a similar kind of processing like spark.

RDD stands for Resilient Distributed Datasets. We can call RDD a fundamental data structure in Apache Spark.

We need to import RDD from the pyspark.rdd module.

So In PySpark to create an RDD, we can use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where,

data can be a one dimensional (linear data) or two dimensional data (row-column data).

RDD Transformations:

A Transformation RDD is an operation that is applied to an RDD to create new data from the existing RDD. Using Transformations, we are able to filter the RDD by applying some transformations.

Let’s see the transformations that are performed on the given RDD.

We will discuss them one by one.

1. map()

map() transformation is used to map a value to the elements present in the RDD. It takes an anonymous function as a parameter, like lambda and transforms the elements in an RDD.

Syntax:

RDD_data.map(anonymous_function)

Parameters:

anonymous_function looks like:

lambda element:operation

For example, the operation is to add/subtract all the elements with some new element.

Let’s see the examples to understand this transformation better.

Example 1:

In this example, we create an RDD named student_marks with 20 elements and apply map() transformation by adding each element with 20 and displaying them using collect() action.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements

student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD

print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#Apply map() transformation by adding 20 to each element in RDD

print("After adding 20 to each element in RDD:",student_marks.map(lambda element: element+ 20).collect())

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

After adding 20 to each element in RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

From the above output, we can see that element 20 is added to each and every element in RDD through the lambda function using map() transformation.

Example 2:

In this example, we create an RDD named student_marks with 20 elements and apply map() transformation by subtracting each element by 15 and displaying them using collect() action.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements

student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD

print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#Apply map() transformation by subtracting 15 from each element in RDD

print("After subtracting 15 from each element in RDD:",student_marks.map(lambda element: element-15).collect())

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

After subtracting 15 from each element in RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

From the above output, we can see that element 15 is subtracted to each and every element in RDD through the lambda function using map() transformation.

2. filter()

filter() transformation is used to filter values from the RDD. It takes an anonymous function like lambda and returns the elements by filtering elements from an RDD.

Syntax:

RDD_data.filter(anonymous_function)

Parameters:

anonymous_function looks like:

lambda element:condition/expression

For example, the condition is used to specify the expressive statements to filter the RDD.

Let’s see examples to understand this transformation better.

Example 1:

In this example, we create an RDD named student_marks with 20 elements and apply filter() transformation by filtering only multiples of 5 and displaying them using collect() action.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student marks data with 20 elements

student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

 

#display data in RDD

print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

 

#Apply filter() transformation by returning inly multiples of 5.

print("Multiples of 5 from an RDD: ",student_marks.filter(lambda element: element%5==0).collect())

)

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Multiples of 5 from an RDD: [90, 100, 45]

From the above output, we can see that multiples of 5 elements are filtered from the RDD.

Example 2:

In this example, we create an RDD named student_marks with 20 elements and apply filter() transformation by filtering elements that are greater than 45 and displaying them using collect() action.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student marks data with 20 elements

student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

 

#display data in RDD

print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

 

#Apply filter() transformation by filtering values greater than 45

print("Values greater than 45: ",student_marks.filter(lambda element: element>45).collect())

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Values greater than 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

From the above output, we can see those elements greater than 45 are filtered from the RDD.

3. union()

union() transformation is used to combine two RDDs. We can perform this transformation on two RDDs..

Syntax:

RDD_data1.union(RDD_data2)

Let’s see examples to understand this transformation better.

Example 1:

In this example, we will create a single RDD with student marks data and generate two RDD from the single RDD by filtering some values using filter() transformation. After that, we can perform union() transformation on the two filtered RDDs.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student marks data with 20 elements

student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

 

#display data in RDD

print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

 

first_filter = student_marks.filter(lambda element: element >90)

second_filter = student_marks.filter(lambda element: element <40)

#display first filtered transformation

print("Elements in RDD greater than 90 ",first_filter.collect())

#display second filtered transformation

print("Elements in RDD less than 40 ",second_filter.collect())

#Apply union() transformation by performing union on the above 2 filters

print("Union Transformation on two filtered data",first_filter.union(second_filter).collect())

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Elements in RDD greater than 90 [100]

Elements in RDD less than 40 [34, 22, 23, 21, 34, 34, 34]

Union Transformation on two filtered data [100, 34, 22, 23, 21, 34, 34, 34]

From the above output, you can see that we performed union on first_filter and second_filter.

first_filter is obtained by getting elements from studentsmarks RDD greater than 90 and second_filter is obtained by getting elements from studentsmarks RDD less than 40 using filter() transformation.

Example 2:

In this example, we will create two RDDs such that the first RDD has 20 elements and the second RDD has 10 elements. Following that, we can apply a union() transformation to these two RDDs.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student marks data with 20 elements

student_marks1 =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

 

 

# create student marks data with 10 elements

student_marks2 =spark_app.sparkContext.parallelize([45,43,23,56,78,21,34,34,56,34])

 

#display data in RDD

print("Actual data in student marks 1 RDD: ",student_marks1.map(lambda element: element).collect())

#display data in RDD

print("Actual data in student marks 2 RDD: ",student_marks2.map(lambda element: element).collect())

#Apply union() transformation by performing union on the above 2 RDD's

print("Union Transformation on two RDD ",student_marks1.union(student_marks2).collect())

Output:

Actual data in student marks 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Actual data in student marks 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Union Transformation on two RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

We can see that two RDD’s are combined using union() transformation.

Conclusion

From this PySpark tutorial, we see three transformations applied to RDD. map() transformation is used to map by transforming elements in an RDD, filter() is used to perform filter operations and create a new filtered RDD from the existing RDD. Finally, we discussed union() RDD that is used to combine two RDDs.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain