Apache Spark

PySpark Pair RDD – Actions

In Python, PySpark is a Spark module used to provide a similar kind of processing like spark.

RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark. Pair RDD stores the elements/values in the form of key-value pairs. It will store the key-value pair in the format (key,value).

We need to import RDD from the pyspark.rdd module.

In PySpark to create an RDD, we can use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where:

data can be a one dimensional (linear data) or two-dimensional data (row-column data).

RDD Actions:

An action in RDD is an operation that is applied on an RDD to return a single value. In other words, we can say that an action will result from the provided data by doing some operation on the given RDD.

Pair RDD supports only one action. countByKey() is the action that is used in Pair RDD.

countByKey()

As we know Pair RDD has key-value pair elements. countByKey is used to return each key available with total occurrence as value from RDD.

This can be done using the items() method which is a dictionary method in python.

items() is used to extract the key-value pairs from a dictionary. Dictionaries store items in a key-value pair. So, pair RDD is close to the dictionary.

So, this countByKey() action uses the items() method.

Syntax:

Pair_RDD.countByKey().items()

Where Pair_RDD is the pair RDD.

It return the count of values per key in the format – dict_items([(key, value), ……])

We can use a for loop to loop through the keys and values to return separately.

Example:

In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action on it. Finally, we are displaying actual action and using a for loop.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create 6 - subject and rating pairs

subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])

 

#apply countByKey() action on the above subjects_rating pair RDD

dictionary_rdd = subjects_rating.countByKey().items()

#display

print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)

#get the keys and value counts from the above dictionary rdd

for keys, values in dictionary_rdd:

print(keys,"-->", values)

Output:

countByKey() action on RDD-subjects_rating: dict_items([('python', 2), ('javascript', 2), ('linux', 1), ('C#', 1)])

python --> 2

javascript --> 2

linux --> 1

C# --> 1

In the pair RDD,

  1. key-python occured 2 times, so value for it is returned 2
  2. key-javascript occured 2 times, so value for it is returned 2
  3. Key-linux and key-C# occurred 1 time, so value for it is returned 1

countByKey() action with keys()

If you need to return only keys, then countByKey() action uses the keys() method.

Syntax:

Pair_RDD.countByKey().keys()

Where Pair_RDD is the pair RDD.

It returns the count of values per key in the format – dict_items([key, ……])

We can use a for loop to loop through the keys to return separately.

Example:

In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action on it to get only keys. Finally, we are displaying actual action and using a for loop.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create 6 - subject and rating pairs

subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])

 

#apply countByKey() action on the above subjects_rating pair RDD to get keys

dictionary_rdd = subjects_rating.countByKey().keys()

#display

print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)

#get the keys from the above dictionary rdd

for keys in dictionary_rdd:

print(keys)

Output:

countByKey() action on RDD-subjects_rating: dict_keys(['python', 'javascript', 'linux', 'C#'])

python

javascript

linux

C#

We can see that only the key is returned.

countByKey() action with values()

If you need to return only total values per key, then countByKey() action uses the values() method.

Syntax:

Pair_RDD.countByKey().values()

Where, Pair_RDD is the pair RDD.

It return the count of values per key in the format – dict_items([value, ……])

We can use a for loop to loop through the values to return separately.

Example:

In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action with values() on it to get only values. Finally, we are displaying actual action and using a for loop.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create 6 - subject and rating pairs

subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])

 

#apply countByKey() action on the above subjects_rating pair RDD to get values

dictionary_rdd = subjects_rating.countByKey().values()

#display

print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)

#get the values from the above dictionary rdd

for values in dictionary_rdd:

print(values)

Output:

countByKey() action on RDD-subjects_rating: dict_values([2, 2, 1, 1])

2

2

1

1

We can see that only the total values are returned.

Conclusion

In this PySpark RDD tutorial, we saw how to perform action on Pair RDD using countByKey() action. It used the items() method to return keys available with total occurrence (value). If you only need a key you can use the keys() method with countByKey() and if you need only value count , with countByKey() you can use values().

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain