Apache Spark

PySpark RDD – intersection(), cartesian()

In Python, PySpark is a Spark module used to provide a similar kind of processing like spark using DataFrame.

RDD stands for Resilient Distributed Datasets. We can call RDD a fundamental data structure in Apache Spark. It will store the data in the form of rows and columns similar to a DataFrame or linearly.

We need to import RDD from the pyspark.rdd module.

So In PySpark to create data or a DataFrame, we have to use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where,

data can be a one dimensional (linear data) or two dimensional (row-column data).

In this tutorial, we will learn about PySpark RDD subtract() and distinct() operations.

PySpark RDD – intersection()

intersection() in RDD will return the new RDD that includes the elements present in the first RDD as well as the second RDD. Simply, it returns only common elements from both the RDD.

So, we need two RDDs to perform this operation.

Syntax:

RDD_data1.intersection(RDD_data2)

Where,

  1. RDD_data1 is the first RDD.
  2. RDD_data2 is the second RDD.

Example 1:

In this example, we will create two RDDs with numeric data – subjects_1 and subjects_2 and perform intersection() on two RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 5 - subject marks
subjects_1 =spark_app.sparkContext.parallelize([100,34,56,54,45])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 5 - subject marks
subjects_2 =spark_app.sparkContext.parallelize([90,89,34,56,45])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform intersection() operation on above two RDD's
print("intersection() operation on subjects_1 and subjects_2 : ",subjects_1.intersection(subjects_2).collect())

Output:

subjects_1 RDD: [100, 34, 56, 54, 45]
subjects_2 RDD: [90, 89, 34, 56, 45]
intersection() operation on subjects_1 and subjects_2 : [56, 45, 34]

From the output, we can see that there are 5 elements in both the RDDs.

We are performing an intersection on subjects_1 and subjects_2. Here 56,45 and 34 are present in subjects_1 and subjects_2. So, they were returned.

Example 2:

In this example, we will create two RDD with strings – subjects_1 and subjects_2 and perform intersection() on two RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 3 - subject names
subjects_1 =spark_app.sparkContext.parallelize(["linux","bash","javascript"])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 2 - subject names
subjects_2 =spark_app.sparkContext.parallelize(["linux","java"])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform intersection() operation on above two RDD's
print("intersection() operation on subjects_1 and subjects_2 : ",subjects_1.intersection(subjects_2).collect())

Output:

subjects_1 RDD: ['linux', 'bash', 'javascript']
subjects_2 RDD: ['linux', 'java']
intersection() operation on subjects_1 and subjects_2 : ['linux']

From the output, we can see that only ‘linux’ is common in both the RDDs. So, it is returned.

PySpark RDD – cartesian()

cartesian() in RDD will return the new RDD that includes all the elements from both the RDDs. It returns a cartesian product such that each element in the first RDD is combined with all elements from the second RDD in the form of a pair.

So, we need two RDDs to perform this operation.

Syntax:

RDD_data1.cartesian(RDD_data2)

Where,

  1. RDD_data1 is the first RDD.
  2. RDD_data2 is the second RDD.

Example 1:

In this example, we will create two RDDs with numeric data – subjects_1 and subjects_2 and perform cartesian() on two RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 2 - subject marks
subjects_1 =spark_app.sparkContext.parallelize([100,34])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 2 - subject marks
subjects_2 =spark_app.sparkContext.parallelize([56,45])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform cartesian() operation on above two RDD's
print("cartesian() operation on subjects_1 and subjects_2 : ",subjects_1.cartesian(subjects_2).collect())

Output:

subjects_1 RDD: [100, 34]
subjects_2 RDD: [56, 45]
cartesian() operation on subjects_1 and subjects_2 : [(100, 56), (100, 45), (34, 56), (34, 45)]

From the output, we can see that there are 2 elements in both the RDDs.

We are performing a cartesian on subjects_1 and subjects_2. Elements from subjects_1 are paired with each and every element in subjects_2.

Example 2:

In this example, we will create two RDD with strings – subjects_1 and subjects_2 and perform cartesian() on two RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 3 - subject names
subjects_1 =spark_app.sparkContext.parallelize(["linux","bash","javascript"])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 2 - subject names
subjects_2 =spark_app.sparkContext.parallelize(["linux","java"])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform cartesian() operation on above two RDD's
print("cartesian() operation on subjects_1 and subjects_2 : ",subjects_1.cartesian(subjects_2).collect())

Output:

subjects_1 RDD: ['linux', 'bash', 'javascript']
subjects_2 RDD: ['linux', 'java']
cartesian() operation on subjects_1 and subjects_2 : [('linux', 'linux'), ('linux', 'java'), ('bash', 'linux'), ('javascript', 'linux'), ('bash', 'java'), ('javascript', 'java')]

From the output, we can see that there are 3 elements in the first RDD and 2 elements in the second RDD.

We are performing a cartesian on subjects_1 and subjects_2. Here, elements from subjects_1 are paired with each and every element in subects_2.

Conclusion

In this PySpark RDD tutorial, we saw how to perform intersection() and cartesian(). We need two RDDs to perform these operations.intersection() returns only common elements from both the RDD and cartesian() returns a cartesian product such that each element in the first RDD is combined with all elements from the second RDD in the form of a pair.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain