RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark. Pair RDD stores the elements/values in the form of key-value pairs. It will store the key-value pair in the format (key,value).
We need to import RDD from the pyspark.rdd module.
In PySpark to create an RDD, we can use the parallelize() method.
Syntax:
Where:
data can be a one dimensional (linear data) or two-dimensional data (row-column data).
RDD Actions:
An action in RDD is an operation that is applied on an RDD to return a single value. In other words, we can say that an action will result from the provided data by doing some operation on the given RDD.
Pair RDD supports only one action. countByKey() is the action that is used in Pair RDD.
countByKey()
As we know Pair RDD has key-value pair elements. countByKey is used to return each key available with total occurrence as value from RDD.
This can be done using the items() method which is a dictionary method in python.
items() is used to extract the key-value pairs from a dictionary. Dictionaries store items in a key-value pair. So, pair RDD is close to the dictionary.
So, this countByKey() action uses the items() method.
Syntax:
Where Pair_RDD is the pair RDD.
It return the count of values per key in the format – dict_items([(key, value), ……])
We can use a for loop to loop through the keys and values to return separately.
Example:
In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action on it. Finally, we are displaying actual action and using a for loop.
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
# import RDD from pyspark.rdd
from pyspark.rdd import RDD
#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()
# create 6 - subject and rating pairs
subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])
#apply countByKey() action on the above subjects_rating pair RDD
dictionary_rdd = subjects_rating.countByKey().items()
#display
print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)
#get the keys and value counts from the above dictionary rdd
for keys, values in dictionary_rdd:
print(keys,"-->", values)
Output:
python --> 2
javascript --> 2
linux --> 1
C# --> 1
In the pair RDD,
- key-python occured 2 times, so value for it is returned 2
- key-javascript occured 2 times, so value for it is returned 2
- Key-linux and key-C# occurred 1 time, so value for it is returned 1
countByKey() action with keys()
If you need to return only keys, then countByKey() action uses the keys() method.
Syntax:
Where Pair_RDD is the pair RDD.
It returns the count of values per key in the format – dict_items([key, ……])
We can use a for loop to loop through the keys to return separately.
Example:
In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action on it to get only keys. Finally, we are displaying actual action and using a for loop.
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
# import RDD from pyspark.rdd
from pyspark.rdd import RDD
#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()
# create 6 - subject and rating pairs
subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])
#apply countByKey() action on the above subjects_rating pair RDD to get keys
dictionary_rdd = subjects_rating.countByKey().keys()
#display
print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)
#get the keys from the above dictionary rdd
for keys in dictionary_rdd:
print(keys)
Output:
python
javascript
linux
C#
We can see that only the key is returned.
countByKey() action with values()
If you need to return only total values per key, then countByKey() action uses the values() method.
Syntax:
Where, Pair_RDD is the pair RDD.
It return the count of values per key in the format – dict_items([value, ……])
We can use a for loop to loop through the values to return separately.
Example:
In this example, we created a Pair RDD named subjects_rating with 6 key value pairs and applied countByKey() action with values() on it to get only values. Finally, we are displaying actual action and using a for loop.
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
# import RDD from pyspark.rdd
from pyspark.rdd import RDD
#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()
# create 6 - subject and rating pairs
subjects_rating =spark_app.sparkContext.parallelize([('python',4),('javascript',2),('linux',5),('C#',4),('javascript',4),('python',3)])
#apply countByKey() action on the above subjects_rating pair RDD to get values
dictionary_rdd = subjects_rating.countByKey().values()
#display
print("countByKey() action on RDD-subjects_rating: ",dictionary_rdd)
#get the values from the above dictionary rdd
for values in dictionary_rdd:
print(values)
Output:
2
2
1
1
We can see that only the total values are returned.
Conclusion
In this PySpark RDD tutorial, we saw how to perform action on Pair RDD using countByKey() action. It used the items() method to return keys available with total occurrence (value). If you only need a key you can use the keys() method with countByKey() and if you need only value count , with countByKey() you can use values().