First, we will see how to partition the DataFrame in PySpark.
Partition
It is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module. There are two steps to partition the rows in a PySpark DataFrame.
Steps:
- Create a PySpark DataFrame that has some similar values in at least one column.
- Partition the data using the partitionBy() method available in the Window function.
Syntax:
We can order the partitioned data with the partitioned column or any other column.
Let’s create the DataFrame.
Example:
Here, we create a PySpark DataFrame that has 5 columns – [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark_app = SparkSession.builder.appName('_').getOrCreate()
students =[(4,'sravan',23,'PHP','Testing'),
(4,'sravan',23,'PHP','Testing'),
(46,'mounika',22,'.NET','HTML'),
(4,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',22,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(4,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]
dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])
print("----------Actual DataFrame----------")
dataframe_obj.show()
Output:
PySpark Variance() Window Function
The variance() in the Window function is used to return the variance in each partition. It can be possible to return variance after partitioning the DataFrame.
Syntax:
Parameter:
Here, the variance() function takes the column name as the parameter. It returns the variance in this column in each partition.
We add the result into a column named VARIANCE using the withColumn() function.
Example 1:
Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the variance in the subject_id column in each partition.
from pyspark.sql.window import Window
#import the variance and col from pyspark.sql.functions
from pyspark.sql.functions import variance,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the variance in subject_id column for each partition
dataframe_obj.withColumn("VARIANCE",variance(col("subject_id")).over(partition)).show()
Output:
Explanation:
The total number of partitions is 4.
Partition 1:
The .NET occurred two times in the first partition. The variance of the subject_id column is 0.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the variance is null.
Partition 3:
Oracle occurred four times in the third partition.
The variance of the subject_id column is 401.0.
Partition 4:
PHP occurred three times in the fourth partition.
The variance of the subject_id column is 0.0.
Example 2:
Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the variance in the age column in each partition.
from pyspark.sql.window import Window
#import the variance and col from pyspark.sql.functions
from pyspark.sql.functions import variance,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the variance in age column for each partition
dataframe_obj.withColumn("VARIANCE",variance(col("age")).over(partition)).show()
Output:
Explanation:
The total number of partitions is 4.
Partition 1:
The .NET occurred two times in the first partition. The variance of the age column is 0.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the variance is null.
Partition 3:
Oracle occurred four times in the third partition.
The variance of the age column is 0.666.
Partition 4:
PHP occurred three times in the fourth partition.
The variance of the age column is 1.333.
PySpark Stddev() Window Function
The stddev() in the Window function is used to return the standard deviation in each partition. It can be possible to return the standard deviation after partitioning the DataFrame.
Syntax:
Parameter:
Here, the stddev() takes the column name as the parameter. It returns the standard deviation in this column in each partition.
We add the result into a column named STANDARD DEVIATION using the withColumn() function.
Example 1:
Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the standard deviation in the subject_id column in each partition.
from pyspark.sql.window import Window
#import the stddev and col from pyspark.sql.functions
from pyspark.sql.functions import stddev,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the satndard deviation in subject_id column for each partition
dataframe_obj.withColumn("STANDARD DEVIATION",stddev(col("subject_id")).over(partition)).show()
Output:
Explanation:
The total number of partitions is 4.
Partition 1:
The .NET occurred two times in the first partition. The standard deviation of the subject_id column is 0.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the standard deviation is null.
Partition 3:
Oracle occurred four times in the third partition.
The standard deviation of the subject_id column is 20.024984.
Partition 4:
PHP occurred three times in the fourth partition.
The standard deviation of the subject_id column is 0.0.
Example 2:
Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the standard deviation in the age column in each partition.
from pyspark.sql.window import Window
#import the stddev and col from pyspark.sql.functions
from pyspark.sql.functions import stddev,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the satndard deviation in age column for each partition
dataframe_obj.withColumn("STANDARD DEVIATION",stddev(col("age")).over(partition)).show()
Output:
Explanation:
The total number of partitions is 4.
Partition 1:
The .NET occurred two times in the first partition. The standard deviation of the age column is 0.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the standard deviation is null.
Partition 3:
Oracle occurred four times in the third partition.
The standard deviation of the age column is 0.8164.
Partition 4:
PHP occurred three times in the fourth partition.
The standard deviation of the age column is 1.1547.
Conclusion
In this PySpark partitioning tutorial, we learned how to return the variance in each partitioned window using the variance() function and the standard deviation in each partitioned window using the stddev() function. We added the result to the existing DataFrame as a new column. Make sure that you import the variance and stddev from the pyspark.sql.functions module.