Apache Spark

PySpark – Row_Number() Function

It is possible to partition the rows in the DataFrame using the Window function in PySpark,. It is available in the pyspark.sql.window module.

The advantage of partition is to achieve the data consistency and grouping. After performing the partition, we can provide the numbers to the row in each partition using the row_number() function.

It is possible to partition the rows in a DataFrame based on the values in a particular column. All the similar values are set to each partition. Suppose there are 3 types of similar values, then each type is a partition. So, there will be three partitions.

Row_Number() Window Function

The row_number in PySpark is a window function which is used to set the numbers starting from 1 in each partitioned window.

The main thing is that the row_number() is not available directly. We have to import it from the pyspark.sql.functions module.

Syntax to Import Module:

from pyspark.sql.functions import row_number

Otherwise, we can also do like this to import all the methods from this module.

from pyspark.sql.functions import *

We have to follow the following steps to perform the partition and apply the row_number on the partitioned data.

Steps:

  1. Create a PySpark DataFrame that has some similar values in at least one column.
  2. Partition the data using the partitionBy() method available in the Window function and order them based on the column using the orderBy() function.

Syntax:

partition=Window.partitionBy(“column”).orderBy(“column”)

We can order the partitioned data with the partitioned column or any other column.

  1. Now, you can use the row_number() function on the partitioned rows using the over() function. So, we add a column to store the row number using the withColumn() function.

Syntax:

dataframe_obj.withColumn("NAME",row_number().over(partition))

Here, the NAME specifies the row name and the dataframe_obj is our PySpark DataFrame.

Let’s implement the code.

Example 1:

Here, we create a PySpark DataFrame that has 5 columns – [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows and partition the rows based on the technology1 using the Window function after getting the row numbers for partitions and add these in the column – Row for technology1.

import pyspark
from pyspark.sql import *

spark_app = SparkSession.builder.appName('_').getOrCreate()

students =[(4,'sravan',23,'PHP','Testing'),
(2,'sravan',23,'Oracle','Testing'),
(46,'mounika',22,'.NET','HTML'),
(12,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',23,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(45,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])

print("----------Actual DataFrame----------")
dataframe_obj.show()

# import the Window Function
from pyspark.sql.window import Window

#import the row_number from pyspark.sql.functions
from pyspark.sql.functions import row_number

#partition the dataframe based on the values in technology1 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology1").orderBy('subject_id')

print("----------Partitioned DataFrame----------")

#Now mention row number for each row in ROW column
dataframe_obj.withColumn("Row for technology1",row_number().over(partition)).show()

Output:

Explanation:

The first output represents the actual data present in the DataFrame. In the second output, the partition is done based on the technology1 column.

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. So, the row_numbers are 1 and 2.

Partition 2:

Hadoop occurred one time in the second partition. So, the row_number is 1.

Partition 3:

Oracle occurred five times in the third partition. So, the row_numbers are 1, 2, 3, 4, and 5.

Partition 4:

PHP occurred two times in the fifth partition. So, the row_numbers are 1 and 2.

Example 2:

For the same DataFrame, we partition the rows based on technology2 using the Window function after getting the row numbers for partitions and add these in column – Row for technology2.

import pyspark
from pyspark.sql import *

spark_app = SparkSession.builder.appName('_').getOrCreate()

students =[(4,'sravan',23,'PHP','Testing'),
(2,'sravan',23,'Oracle','Testing'),
(46,'mounika',22,'.NET','HTML'),
(12,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',23,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(45,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])

print("----------Actual DataFrame----------")
dataframe_obj.show()

# import the Window Function
from pyspark.sql.window import Window

#import the row_number from pyspark.sql.functions
from pyspark.sql.functions import row_number

#partition the dataframe based on the values in technology2 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology2").orderBy('subject_id')
print("----------Partitioned DataFrame----------")

#Now mention row number for each row in ROW column
dataframe_obj.withColumn("Row for technology2",row_number().over(partition)).show()

Output:

Explanation:

The first output represents the actual data present in the DataFrame. In the second output, the partition is done based on the technology2 column.

The total number of partitions is 3.

Partition 1:

C# occurred three times in the first partition. So, the row_numbers are 1, 2, and 3.

Partition 2:

HTML occurred two times in the second partition. So, the row_numbers are 1 and 2.

Partition 3:

Testing occurred five times in the third partition. So, the row_numbers are 1, 2, 3, 4, and 5.

Conclusion

We discussed how to partition the rows in the PySpark DataFrame and get the row numbers in each partition using the row_number() Window function. The row_number in PySpark is a window function which is used to set the numbers starting from 1 in each partitioned window. Make sure to use this command while using this function – from pyspark.sql.functions import row_number.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain