Apache Spark

PySpark – Min() Function

After partitioning the rows in the PySpark DataFrame, it is possible to return the minimum and maximum values in each partition. By using some aggregate functions on a partition window, it is possible to return the minimum/maximum.

First, we see how to partition the DataFrame in PySpark.

Partition

It is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module. There will be two steps to partition the rows in a PySpark DataFrame.

Steps:

  1. Create a PySpark DataFrame that has some similar values in at least one column.
  2. Partition the data using partitionBy() method available inWindow function.

Syntax:

partition=Window.partitionBy(“column”)

We can order the partitioned data with the partitioned column or any other column.

Let’s create the DataFrame.

Example:

Here, we create a PySpark DataFrame that has 5 columns – [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark_app = SparkSession.builder.appName('_').getOrCreate()

students =[(4,'sravan',23,'PHP','Testing'),
(4,'sravan',23,'PHP','Testing'),
(46,'mounika',22,'.NET','HTML'),
(4,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',22,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(4,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])
print("----------Actual DataFrame----------")
dataframe_obj.show()

Output:

PySpark Min() Window Function

The min() in the Window function is used to return the minimum value in each partition. It can be possible to return a minimum value after partitioning the DataFrame.

Syntax:

dataframe_obj.withColumn("MINIMUM",min(col("column")).over(partition))

Parameter:

min(col("column"))

Here, the min() function takes the column name as the parameter. It returns the minimum of values in this column in each partition.

We add the result into a column named MINIMUM using the withColumn() function.

Example 1:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the minimum value from the subject_id column in each partition.

# import the Window Function
from pyspark.sql.window import Window

#import the min and col from pyspark.sql.functions
from pyspark.sql.functions import min,col

#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")

#return the minimum value in subject_id column for each partition
dataframe_obj.withColumn("MINIMUM",min(col("subject_id")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The minimum value in the subject_id column is 46.

Partition 2:

Hadoop occurred one time in the second partition. So, the minimum value is 12.

Partition 3:

Oracle occurred four times in the third partition.

The minimum value in the subject_id column is 4.

Partition 4:

PHP occurred three times in the fourth partition.

The minimum value in the subject_id column is 4.

Example 2:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the minimum value from the age column in each partition.

# import the Window Function
from pyspark.sql.window import Window

#import the min and col from pyspark.sql.functions
from pyspark.sql.functions import min,col

#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")

print("----------Partitioned DataFrame----------")
#return the minimum value in age column for each partition
dataframe_obj.withColumn("MINIMUM",min(col("age")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The minimum value in the age column is 22.

Partition 2:

Hadoop occurred one time in the second partition. The minimum value is 22.

Partition 3:

Oracle occurred four times in the third partition.

The minimum value in the age column is 21.

Partition 4:

PHP occurred three times in the fourth partition.

The minimum value in the age column is 21.

PySpark Max() Window Function

The max() in the Window function is used to return the maximum value in each partition. It can be possible to return a maximum value after partitioning the DataFrame.

Syntax:

dataframe_obj.withColumn("MAXIMUM",max(col("column")).over(partition))

Parameter:

max(col("column"))

Here, the max() function takes the column name as the parameter. It returns the maximum of values in this column in each partition.

We add the result into a column named MAXIMUM using the withColumn() function.

Example 1:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the maximum value from the subject_id column in each partition.

# import the Window Function
from pyspark.sql.window import Window
#import the max and col from pyspark.sql.functions
from pyspark.sql.functions import max,col

#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")

#return the maximum value in subject_id column for each partition
dataframe_obj.withColumn("MAXIMUM",max(col("subject_id")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The maximum value in the subject_id column is 46.

Partition 2:

Hadoop occurred one time in the second partition. So, the maximum value is 12.

Partition 3:

Oracle occurred four times in the third partition.

The maximum value in the subject_id column is 46.

Partition 4:

PHP occurred three times in the fourth partition.

The maximum value in the subject_id column is 4.

Example 2:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the maximum value from the age column in each partition.

# import the Window Function
from pyspark.sql.window import Window

#import the max and col from pyspark.sql.functions
from pyspark.sql.functions import max,col

#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")

#return the maximum value in age column for each partition
dataframe_obj.withColumn("MAXIMUM",max(col("age")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The maximum value in the age column is 22.

Partition 2:

Hadoop occurred one time in the second partition. So, the maximum value is 22.

Partition 3:

Oracle occurred four times in the third partition.

The maximum value in the age column is 23.

Partition 4:

PHP occurred three times in the fourth partition.

The maximum value in the age column is 23.

Conclusion

In this PySpark partitioning tutorial, we learned how to return the minimum values in each partitioned window using the min() function and the maximum values in each partitioned window using the max() function. We added the result to the existing DataFrame as a new column. Make sure that you have to import the min and max from the pyspark.sql.functions module.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain