Apache Spark

PySpark – Lead() Function

The lead() function in PySpark is available in the Window module which is used to return the next rows values to the current rows. First, the lead() function returns the null for the last row/s in a partition. It takes an offset parameter which represents the total number of rows such that the next row values are returned to the actual rows. The nulls are placed for first last row/s (offset).

It is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module.

Syntax:

dataframe_obj.withColumn("LEAD_Column",lead("column", offset).over(partition))

It takes two parameters:

  1. The LEAD_Column is the column name in PySpark DataFrame in which the leaded row values are placed based on the values in this column.
  2. The offset specifies the integer to return that number of the next rows to the current row values.

Steps:

  1. Create a PySpark DataFrame that has some similar values in at least one column.
  2. Partition the data using the partitionBy() method available in the Window function and order them based on the column using the orderBy() function.

Syntax:

partition=Window.partitionBy(“column”).orderBy(“column”)

We can order the partitioned data with the partitioned column or any other column.

Now, you can use the lead() function on the partitioned rows using the over() function.

We add a column to store the row number using the withColumn() function.

Syntax:

dataframe_obj.withColumn("LEAD_Column",lead("column", offset).over(partition))

Here, the NAME specifies the row name and the dataframe_obj is our PySpark DataFrame.

Let’s implement the code.

Example 1:

Here, we create a PySpark DataFrame that has 5 columns – [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows and partition the rows based on the technology1 using the Window function. After that, we lead 1 row.

import pyspark
from pyspark.sql import *

spark_app = SparkSession.builder.appName('_').getOrCreate()

students =[(4,'sravan',23,'PHP','Testing'),
(4,'sravan',23,'PHP','Testing'),
(46,'mounika',22,'.NET','HTML'),
(4,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',22,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(4,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])
print("----------Actual DataFrame----------")

dataframe_obj.show()

# import the Window Function
from pyspark.sql.window import Window

#import the lead from pyspark.sql.functions
from pyspark.sql.functions import lead

#partition the dataframe based on the values in technology1 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology1").orderBy('subject_id')
print("----------Partitioned DataFrame----------")

#Now mention LEAD with offset-1 based on subject_id
dataframe_obj.withColumn("LEAD",lead("subject_id",1).over(partition)).show()

Output:

Explanation:

The first output represents the actual data present in the DataFrame. In the second output, the partition is done based on the technology1 column.

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. Since we specified the lead-offset as 1, the last .NET value is null and the first .NET value is next row subject_id value – 46.

Partition 2:

Hadoop occurred one time in the second partition. So, lead is null.

Partition 3:

Oracle occurred four times in the third partition.

  1. For the last Oracle, lead is null.
  2. For the first Oracle, the lead value is 4 (since the next row subject_id value is 4).
  3. For the third Oracle, the lead value is 12 (since the next row subject_id value is 12).

For the fourth Oracle, the lead value is 46 (since the next row subject_id value is 46).

Partition 4:

PHP occurred three times in the fourth partition.

  1. The lead value for the 3rd PHP is null.
  2. The lead value for the 1st PHP is 4 (since the next row subject_id value is 4).
  3. The lead value for the 2nd PHP is 4 (since the next row subject_id value is 4).

Example 2:

Lead the rows by 2. Make sure that you created the PySpark DataFrame as seen in Example 1.

# import the Window Function
from pyspark.sql.window import Window

#import the lead from pyspark.sql.functions
from pyspark.sql.functions import lead

#partition the dataframe based on the values in technology1 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology1").orderBy('subject_id')
print("----------Partitioned DataFrame----------")

#Now mention LEAD with offset-2 based on subject_id
dataframe_obj.withColumn("LEAD",lead("subject_id",2).over(partition)).show()

Output:

Explanation:

The partition is done based on the technology1 column.

The total numbers of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. Since we specified the lead-offset as 2, the offset is null for both values.

Partition 2:

Hadoop occurred one time in the second partition. So, lead is null.

Partition 3:

Oracle occurred four times in the third partition.

  • In the last two Oracle, the lead is null.
  • For the first Oracle, the lead value is 12 (since the next 2 rows subject_id value is 12).
  • For the second Oracle, the lead value is 46 (since the next 2 rows subject_id value is 46).

Partition 4:

PHP occurred three times in the fourth partition.

  • In the last two Oracle, the lead is null.
  • For the first PHP, the lead value is 4 (since the next 2 rows subject_id value is 4).

Example 3:

Lead the rows by 2 based on the age column. Make sure that you created the PySpark DataFrame as seen in Example 1.

# import the Window Function
from pyspark.sql.window import Window

#import the lead from pyspark.sql.functions
from pyspark.sql.functions import lead

#partition the dataframe based on the values in technology1 column and
#order the rows in each partition based on subject_id column
partition = Window.partitionBy("technology1").orderBy('subject_id')
print("----------Partitioned DataFrame----------")

#Now mention LEAD with offset-2 based on age
dataframe_obj.withColumn("LEAD",lead("age",2).over(partition)).show()

Output:

Explanation:

The partition is done based on the technology1 column and the lead is defined based on the age column.

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. Since we specified the lead-offset as 2, the offset is null for both values.

Partition 2:

Hadoop occurred one time in the second partition. So, the lead is null.

Partition 3:

Oracle occurred four times in the third partition.

  • For the last two Oracle, the lead is null.
  • For the first Oracle, the lead value is 22 (since the next 2 rows age value is 22).
  • For the second Oracle, the lead value is 22 (since the next 2 rows age value is 22).

Partition 4:

PHP occurred three times in the fourth partition.

  • For the last two Oracle, the lead is null.
  • For the first PHP, the lead value is 21 (since the next 2 rows age value is 21).

Conclusion

We learned how to get the lead values in PySpark DataFrame in the partitioned rows. The lead() function in PySpark is available in the Window module which is used to return the next row values to the current rows. It takes an offset parameter which represents the total number of rows such that the next row values are returned to the actual rows. For the first last row/s, the (offset) nulls are placed. We learned the different examples by setting the different offsets.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain