Apache Spark

PySpark rank() Window Function

Specifying a rank number for each row of PySpark is the best way so that it is easy to find the rows in a partition.

In PySpark, it is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module.

The advantage of partition is to achieve data consistency and grouping. After performing the partition, we can set the Ranks to the rows in each partition using the rank() function.

It is possible to partition the rows in a DataFrame based on the values in a particular column. So, all the similar values are set to each partition.

rank() Window Function

Rank in PySpark is a window function that sets the ranks to the rows in each partition.

The rank starts from 1, and if any values are the same in two or more rows, then the same rank will be assigned. However, for the next row, the rank will be the following number counting from the tied rows (rows with the same rank).

Here, the values are from the column where we specified the column inside orderBy(). So, based on the values in this column, rank() checks and assigns the rank number.

The main thing is rank() is not available directly. We have to import it from the pyspark.sql.functions module.

Syntax To Import Module
from pyspark.sql.functions import rank

Otherwise, we can also do like this to import all methods from this module.
from pyspark.sql.functions import *

We must follow the steps below to perform partition and apply rank on partitioned data.

Steps:

  1. Create a PySpark DataFrame with similar values in at least one column.
  2. Partition the data using partitionBy() method available in Window function and order them based on the column using orderBy() function.

    Syntax:
    partition=Window.partitionBy(“column”).orderBy(“column”)

    We can order the partitioned data with the partitioned column or any other column.
  3. Now, using the over() function on the partitioned rows, you can use the rank() function.

    So, we will add a column to store the row number using the withColumn() function.

    Syntax:
    dataframe_obj.withColumn(“NAME”,rank().over(partition))

    So, NAME specifies the row name here, and dataframe_obj is our PySpark DataFrame.

    Let’s implement the code.

Example 1
Here, we will create a PySpark DataFrame that has 5 columns: [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows and partition the rows based on technology1 using the Window function.

Finally, we will give ranks to the rows in all partitions by adding a new column named “RANK”.

import pyspark

from pyspark.sql import *

spark_app = SparkSession.builder.appName('_').getOrCreate()


students =[(4,'sravan',23,'PHP','Testing'),
           (4,'sravan',23,'PHP','Testing'),
           (46,'mounika',22,'.NET','HTML'),
           (4,'deepika',21,'Oracle','HTML'),
           (46,'mounika',22,'Oracle','Testing'),
           (12,'chandrika',22,'Hadoop','C#'),
           (12,'chandrika',22,'Oracle','Testing'),
           (4,'sravan',23,'Oracle','C#'),
           (4,'deepika',21,'PHP','C#'),
           (46,'mounika',22,'.NET','Testing')
              ]

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])

print("----------Actual DataFrame----------")
dataframe_obj.show()

# import the Window Function
from pyspark.sql.window import Window

#import the rank from pyspark.sql.functions
from pyspark.sql.functions import rank

#partition the dataframe based on the values in technology1 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology1").orderBy('subject_id')

print("----------Partitioned DataFrame----------")

#Now mention rank for each row in RANK column
dataframe_obj.withColumn("RANK",rank().over(partition)).show()

Output:

Explanation
The first output represents the actual data present in the dataframe.

In the second output, the partition is done based on the technology1 column.

Total partitions: 4

Partition 1:
.NET occurred two times in the first partition. The values in the subject_id column are the same. Hence, the rank() function returns the same rank for both .NET values.

Partition 2:
Hadoop occurred one time in the second partition. So, the rank is 1.

Partition 3:
Oracle occurred four times in the third partition.
For the first two Oracle, the subject_id values are the same. So, for both, rank 1 is assigned.
For the third Oracle, the rank is 3 (rank 2 will not exist since, for the above two oracles, 2 ranks were given).
For the last Oracle, the rank is 4.

Partition 4:
PHP occurred three times in the fourth partition, and the values in the subject_id column are the same for all. Hence, Rank is 1 for all PHP.

Example 2
Here, we will partition the rows based on technology2 using the Window function.

Finally, we will give ranks to the rows based on the subject_id column in all partitions by adding a new column named “RANK”.

(If you didn’t create the PySpark DataFrame, create it using the first example.)

# import the Window Function
from pyspark.sql.window import Window

#import the rank from pyspark.sql.functions
from pyspark.sql.functions import rank

#partition the dataframe based on the values in technology2 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology2").orderBy('subject_id')

print("----------Partitioned DataFrame----------")

#Now mention rank for each row in RANK column
dataframe_obj.withColumn("RANK",rank().over(partition)).show()

Output:

Explanation
The output represents the actual data present in the dataframe.

In the second output, the partition is done based on the technology2 column and ordered based on the subject_id column.

Total partitions: 3

Partition 1:
C# occurred three times in the first partition. The first two values in the subject_id column are the same. Hence, the rank() function returns the same rank 1 for the first two C#, and for the third C#, the rank is 3.

Partition 2:
HTML occurred twice in the second partition, and the subject_id values are different for these two. So, rank 1 is assigned for the first HTML, and rank 2 is assigned for the second.

Partition 3:
Testing occurred five times in the third partition.
For the first two tests, the subject_id values are the same. So, for both, rank 1 is assigned.
For the third test, the rank is 3 (rank 2 will not exist since, for the previous two tests, 2 ranks were given).
For the last testing, the rank was 4.

Example 3
Here, we will partition the rows based on technology2 using the Window function.

Finally, we will give ranks to the rows based on the age column in all partitions by adding a new column named “RANK”.

(If you didn’t create the PySpark DataFrame, create it using the first example.)

# import the Window Function
from pyspark.sql.window import Window

#import the rank from pyspark.sql.functions
from pyspark.sql.functions import rank

#partition the dataframe based on the values in technology2 column and
#order the rows in each partition based on age column
partition = Window.partitionBy("technology2").orderBy('age')

print("----------Partitioned DataFrame----------")

#Now mention rank for each row in RANK column
dataframe_obj.withColumn("RANK",rank().over(partition)).show()

Output:

Explanation
The output represents the actual data present in the dataframe.

In the second output, the partition is done based on the technology2 column and ordered based on the age column.

Total partitions: 3

Partition 1:
C# occurred three times in the first partition. And all the values in the age column are different. Hence, the rank() function returns the ranks 1, 2, and 3 for three C# values.

Partition 2:
HTML occurred two times in the first partition. All the values in the age column are different. Hence, the rank() function returns the ranks 1 and 2 for two HTML values.

Partition 3:
Testing occurred five times in the third partition.
For the first three testing, the age values are the same. So, for three, rank 1 is assigned.
For the fourth and fifth testing, the age values are the same. So, for two, rank 4 is assigned.

Conclusion

We discussed partitioning the rows in the PySpark DataFrame and setting the rank in each partition using the rank() Window function. Rank in PySpark is a window function that sets the ranks to the rows in each partition. Make sure to use this command while using this function from pyspark.sql.functions import rank.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain