Apache Spark

PySpark – Variance_Stddev() Function

After partitioning the rows in the PySpark DataFrame, it is possible to return the variance and the standard deviation in each partition. By using some aggregate functions on a partition window, it is possible to return the variance and the standard deviation.

First, we will see how to partition the DataFrame in PySpark.

Partition

It is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module. There are two steps to partition the rows in a PySpark DataFrame.

Steps:

  1. Create a PySpark DataFrame that has some similar values in at least one column.
  2. Partition the data using the partitionBy() method available in the Window function.

Syntax:

partition=Window.partitionBy(“column”)

We can order the partitioned data with the partitioned column or any other column.

Let’s create the DataFrame.

Example:

Here, we create a PySpark DataFrame that has 5 columns – [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows.

import pyspark

 

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

 

spark_app = SparkSession.builder.appName('_').getOrCreate()

 

 

students =[(4,'sravan',23,'PHP','Testing'),

(4,'sravan',23,'PHP','Testing'),

(46,'mounika',22,'.NET','HTML'),

(4,'deepika',21,'Oracle','HTML'),

(46,'mounika',22,'Oracle','Testing'),

(12,'chandrika',22,'Hadoop','C#'),

(12,'chandrika',22,'Oracle','Testing'),

(4,'sravan',23,'Oracle','C#'),

(4,'deepika',21,'PHP','C#'),

(46,'mounika',22,'.NET','Testing')

]

 

dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2'])

 

print("----------Actual DataFrame----------")

dataframe_obj.show()

 

Output:

PySpark Variance() Window Function

The variance() in the Window function is used to return the variance in each partition. It can be possible to return variance after partitioning the DataFrame.

Syntax:

dataframe_obj.withColumn("VARIANCE",variance(col("column")).over(partition))

Parameter:

variance(col("column"))

Here, the variance() function takes the column name as the parameter. It returns the variance in this column in each partition.

We add the result into a column named VARIANCE using the withColumn() function.

Example 1:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the variance in the subject_id column in each partition.

# import the Window Function

from pyspark.sql.window import Window

 

#import the variance and col from pyspark.sql.functions

from pyspark.sql.functions import variance,col

 

#partition the dataframe based on the values in technology1 column

partition = Window.partitionBy("technology1")

 

print("----------Partitioned DataFrame----------")

 

#return the variance in subject_id column for each partition

dataframe_obj.withColumn("VARIANCE",variance(col("subject_id")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The variance of the subject_id column is 0.0.

Partition 2:

Hadoop occurred one time in the second partition. So, the variance is null.

Partition 3:

Oracle occurred four times in the third partition.

The variance of the subject_id column is 401.0.

Partition 4:

PHP occurred three times in the fourth partition.

The variance of the subject_id column is 0.0.

Example 2:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the variance in the age column in each partition.

# import the Window Function

from pyspark.sql.window import Window

 

#import the variance and col from pyspark.sql.functions

from pyspark.sql.functions import variance,col

 

#partition the dataframe based on the values in technology1 column

partition = Window.partitionBy("technology1")

 

print("----------Partitioned DataFrame----------")

 

#return the variance in age column for each partition

dataframe_obj.withColumn("VARIANCE",variance(col("age")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The variance of the age column is 0.0.

Partition 2:

Hadoop occurred one time in the second partition. So, the variance is null.

Partition 3:

Oracle occurred four times in the third partition.

The variance of the age column is 0.666.

Partition 4:

PHP occurred three times in the fourth partition.

The variance of the age column is 1.333.

PySpark Stddev() Window Function

The stddev() in the Window function is used to return the standard deviation in each partition. It can be possible to return the standard deviation after partitioning the DataFrame.

Syntax:

dataframe_obj.withColumn("STANDARD DEVIATION",stddev(col("column")).over(partition))

Parameter:

stddev(col("column"))

Here, the stddev() takes the column name as the parameter. It returns the standard deviation in this column in each partition.

We add the result into a column named STANDARD DEVIATION using the withColumn() function.

Example 1:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the standard deviation in the subject_id column in each partition.

# import the Window Function

from pyspark.sql.window import Window

 

#import the stddev and col from pyspark.sql.functions

from pyspark.sql.functions import stddev,col

 

#partition the dataframe based on the values in technology1 column

partition = Window.partitionBy("technology1")

 

print("----------Partitioned DataFrame----------")

 

#return the satndard deviation in subject_id column for each partition

dataframe_obj.withColumn("STANDARD DEVIATION",stddev(col("subject_id")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The standard deviation of the subject_id column is 0.0.

Partition 2:

Hadoop occurred one time in the second partition. So, the standard deviation is null.

Partition 3:

Oracle occurred four times in the third partition.

The standard deviation of the subject_id column is 20.024984.

Partition 4:

PHP occurred three times in the fourth partition.

The standard deviation of the subject_id column is 0.0.

Example 2:

Now, we have a DataFrame (created previously). Let’s partition it based on the technology1 column and get the standard deviation in the age column in each partition.

# import the Window Function

from pyspark.sql.window import Window

 

#import the stddev and col from pyspark.sql.functions

from pyspark.sql.functions import stddev,col

 

#partition the dataframe based on the values in technology1 column

partition = Window.partitionBy("technology1")

 

print("----------Partitioned DataFrame----------")

 

#return the satndard deviation in age column for each partition

dataframe_obj.withColumn("STANDARD DEVIATION",stddev(col("age")).over(partition)).show()

Output:

Explanation:

The total number of partitions is 4.

Partition 1:

The .NET occurred two times in the first partition. The standard deviation of the age column is 0.0.

Partition 2:

Hadoop occurred one time in the second partition. So, the standard deviation is null.

Partition 3:

Oracle occurred four times in the third partition.

The standard deviation of the age column is 0.8164.

Partition 4:

PHP occurred three times in the fourth partition.

The standard deviation of the age column is 1.1547.

Conclusion

In this PySpark partitioning tutorial, we learned how to return the variance in each partitioned window using the variance() function and the standard deviation in each partitioned window using the stddev() function. We added the result to the existing DataFrame as a new column. Make sure that you import the variance and stddev from the pyspark.sql.functions module.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain