Apache Spark

PySpark RDD – subtract(), distinct()

In Python, PySpark is a Spark module used to provide a similar kind of processing like spark using DataFrame. RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark. It will store the data in the form of rows and columns like a DataFrame.

In this article we will explain subtract(), distinct() methods of the PySpark RDD. These are concepts derived from SQL in order to select specific subsets of data from a data set. subtract compares two different sets for the difference, and distinct finds unique values in groups.

In order to demonstrate, we need to import RDD from the pyspark.rdd module. In PySpark, to create data or a DataFrame, we have to use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where data can be a one dimensional (linear data) or two-dimensional data (row-column data).

In this tutorial, we will see about PySpark RDD subtract() and distinct() operations.

PySpark RDD – subtract()

subtract() in RDD is similar to set difference operation which will return the new RDD that includes the elements present in the first RDD but not present in the second RDD. So, we need two RDD’s to perform this operation.

Syntax:

RDD_data1.subtract(RDD_data2)

Where:

  1. RDD_data1 is the first RDD
  2. RDD_data2 is the second RDD.

Example 1:

In this example, we will create two RDD with numeric data – subjects_1 and subjects_2 and perform subtract() on two RDD’s.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 5 - subject marks
subjects_1 =spark_app.sparkContext.parallelize([100,34,56,54,45])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 5 - subject marks
subjects_2 =spark_app.sparkContext.parallelize([90,89,34,56,45])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform subtract() operation on above two RDD's
print("subtract() operation on subjects_1 with subjects_2 : ",subjects_1.subtract(subjects_2).collect())

#perform subtract() operation on above two RDD's
print("subtract() operation on subjects_2 with subjects_1 : ",subjects_2.subtract(subjects_1).collect())

Output:

subjects_1 RDD: [100, 34, 56, 54, 45]
subjects_2 RDD: [90, 89, 34, 56, 45]
subtract() operation on subjects_1 with subjects_2 : [100, 54]
subtract() operation on subjects_2 with subjects_1 : [89, 90]

From the output, we can see that there are 5 elements in the RDDs.

In the first operation, we are performing subtract on subjects_1 with subjects_2. Here 100 and 54 are present in subjects_1 but not in subjects_2. So, they were returned.

In the second operation, we are performing subtraction on subjects_2 with subjects_1. Here, 89 and 90 are present in subjects_2 but not in subjects_1. So, they were returned.

Example 2:

In this example, we will create two RDD with string data – subjects_1 and subjects_2 and perform subtract() on two RDD’s.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 3 - subject names
subjects_1 =spark_app.sparkContext.parallelize(["linux","bash","javascript"])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

# create 2 - subject names
subjects_2 =spark_app.sparkContext.parallelize(["linux","java"])

#display subjets_2 RDD
print("subjects_2 RDD: ",subjects_2.collect())

#perform subtract() operation on above two RDD's
print("subtract() operation on subjects_1 with subjects_2 : ",subjects_1.subtract(subjects_2).collect())

#perform subtract() operation on above two RDD's
print("subtract() operation on subjects_2 with subjects_1 : ",subjects_2.subtract(subjects_1).collect())

Output:

subjects_1 RDD: ['linux', 'bash', 'javascript']
subjects_2 RDD: ['linux', 'java']
subtract() operation on subjects_1 with subjects_2 : ['bash', 'javascript']
subtract() operation on subjects_2 with subjects_1 : ['java']

From the output, we can see that there are 5 elements in the RDDs. In the first operation, we are performing subtraction on subjects_1 with subjects_2.

Here, ‘bash’ and ‘javascript’ are present in subjects_1 but not in subjects_2. So, they were returned.

In the second operation, we are performing subtract on subjects_2 with subjects_1. Here ‘java’ is present in subjects_2 but not in subjects_1. So, it is returned.

PySpark RDD – distinct()

distinct() in RDD is used to return only unique values from RDD. It is applied on only one RDD

So, we need one RDD to perform this operation. It takes no parameters.

Syntax:

RDD_data.distinct()

Where, RDD_data1 is the first RDD.

Example 1:

In this example, we will create one RDD subjects_1 with 10 numeric values and return unique values by applying distinct() operation.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 10 - subject marks
subjects_1 =spark_app.sparkContext.parallelize([34,56,54,45,45,56,54,4,3,3])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

#perform distinct() operation on above RDD.
print("distinct() operation on subjects_1 : ",subjects_1.distinct().collect())

Output:

subjects_1 RDD: [34, 56, 54, 45, 45, 56, 54, 4, 3, 3]
distinct() operation on subjects_1 : [34, 56, 54, 4, 45, 3]

We created an RDD with 10 integer values that include duplicates. After we applied distinct() to return only unique values.

Example 2:

In this example, we will create one RDD subjects_1 with 5 string values and return unique values by applying distinct() operation.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create 5 - subjects
subjects_1 =spark_app.sparkContext.parallelize(['java','java','python','javascript','javascript'])

#display subjets_1 RDD
print("subjects_1 RDD: ",subjects_1.collect())

#perform distinct() operation on above RDD.
print("distinct() operation on subjects_1 : ",subjects_1.distinct().collect())

Output:

subjects_1 RDD: ['java', 'java', 'python', 'javascript', 'javascript']

distinct() operation on subjects_1 : ['java', 'python', 'javascript']

We created an RDD with 5 string values that include duplicates. After that we applied distinct() to return only unique values. The returned unique values are – java , python and javascript.

Conclusion

In this PySpark RDD tutorial, we discussed subtract() and distinct() methods.subtract() as applied on two RDDs. It is used to return the elements present in the first RDD but not present in the second. RDD.distinct() is applied on single RDD that is used to return unique elements from the RDD.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain