PySpark Partition a Data Frame
It is possible to partition the rows in the DataFrame using the Window function. It is available in the pyspark.sql.window module. There will be two steps to partition the rows in a PySpark DataFrame.
Steps
- Create a PySpark DataFrame with similar values in at least one column.
- Partition the data using partitionBy() method available in the Window function.
Syntax
We can order the partitioned data with the partitioned column or any other column. Let’s create the DataFrame.
Example
Here, we will create a PySpark DataFrame that has 5 columns: [‘subject_id’,’name’,’age’,’technology1′,’technology2′] with 10 rows.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark_app = SparkSession.builder.appName('_').getOrCreate()
students =[(4,'sravan',23,'PHP','Testing'),
(4,'sravan',23,'PHP','Testing'),
(46,'mounika',22,'.NET','HTML'),
(4,'deepika',21,'Oracle','HTML'),
(46,'mounika',22,'Oracle','Testing'),
(12,'chandrika',22,'Hadoop','C#'),
(12,'chandrika',22,'Oracle','Testing'),
(4,'sravan',23,'Oracle','C#'),
(4,'deepika',21,'PHP','C#'),
(46,'mounika',22,'.NET','Testing')
]
dataframe_obj = spark_app.createDataFrame ( students,['subject_id','name','age','technology1','technology2'])
print("----------Actual DataFrame----------")
dataframe_obj.show()
Output:
PySpark sum() Window Function
The sum() in the Window function is used to return the total sum of values in each partition. So, it is possible to return a sum after partitioning the DataFrame.
Syntax
Parameter:
Here, the sum() function takes the column name as the parameter. So, it returns the sum of values in this column in each partition. We add the result into a SUM column using the withColumn() function.
Example 1
Now, we have a DataFrame (Created above). Let’s Partition it based on the technology1 column and get the total sum of all values in the subject_id column in each partition.
from pyspark.sql.window import Window
#import the sum and col from pyspark.sql.functions
from pyspark.sql.functions import sum,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the total sum of values in subject_id column for each partition
dataframe_obj.withColumn ("SUM",sum(col("subject_id")).over(partition)).show()
Output:
Explanation
Total partitions: 4
Partition 1:
.NET occurred two times in the first partition. And the sum of subject_id values is 46+46=92.
Partition 2:
Hadoop occurred one time in the second partition. So, the sum is 12.
Partition 3:
Oracle occurred four times in the third partition.
And the sum of subject_id values is 4+46+12+4=66.
Partition 4:
PHP occurred three times in the fourth partition.
And the sum of subject_id values is 4+4+4=12.
Example 2
Now, we have a DataFrame (Created above). Let’s partition it based on the technology1 column and get the total sum of all values in the age column in each partition.
from pyspark.sql.window import Window
#import the sum and col from pyspark.sql.functions
from pyspark.sql.functions import sum,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the total sum of values in the age column for each partition
dataframe_obj.withColumn("SUM",sum(col("age")).over(partition)).show()
Output:
Explanation
Total partitions: 4
Partition 1:
.NET occurred two times in the first partition. And the sum of age values is 22+22=92.
Partition 2:
Hadoop occurred one time in the second partition. So, the sum is 22.
Partition 3:
Oracle occurred four times in the third partition.
And the sum of age values is 21+22+22+23=88.
Partition 4:
PHP occurred three times in the fourth partition.
And the sum of age values is 23+23+21=67.
PySpark avg() Window Function
The avg() in the Window function is used to return the total average of values in each partition. So, it is possible to return the average after partitioning the DataFrame.
Syntax
Parameter:
Here, avg() takes the column name as the parameter. So, it returns the average of values in this column in each partition. We are adding the result into the AVERAGE column using the withColumn() function.
Example 1
Now, we have a DataFrame (Created above). Let’s partition it based on the technology1 column and get the total average of all values in the subject_id column in each partition.
from pyspark.sql.window import Window
#import the avg and col from pyspark.sql.functions
from pyspark.sql.functions import avg,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the total average of values in the subject_id column for each partition
dataframe_obj.withColumn ("AVERAGE",avg(col("subject_id")).over(partition)).show()
Output:
Explanation
Total partitions: 4
Partition 1:
.NET occurred two times in the first partition. And the average of subject_id values is (46+46)/2=46.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the average is 12.0.
Partition 3:
Oracle occurred four times in the third partition.
And the average of subject_id values is (4+46+12+4)/4=16.5.
Partition 4:
PHP occurred three times in the fourth partition.
And the average of subject_id values is (4+4+4)/3=4.0.
Example 2
Now, we have a DataFrame (Created above). Let’s partition it based on the technology1 column and get the total average of all values in the age column in each partition.
from pyspark.sql.window import Window
#import the avg and col from pyspark.sql.functions
from pyspark.sql.functions import avg,col
#partition the dataframe based on the values in technology1 column
partition = Window.partitionBy("technology1")
print("----------Partitioned DataFrame----------")
#return the total average of values in the age column for each partition
dataframe_obj.withColumn ("AVERAGE",avg(col("age")).over(partition)).show()
Output:
Explanation
Total partitions: 4
Partition 1:
.NET occurred two times in the first partition. And the average age value is (22+22)/2=22.0.
Partition 2:
Hadoop occurred one time in the second partition. So, the average is 22.0.
Partition 3:
Oracle occurred four times in the third partition.
And the average age is (21+22+22+23)/4=22.0.
Partition 4:
PHP occurred three times in the fourth partition.
And the average age value is (23+23+21)/3=22.333.
Conclusion
In this PySpark partitioning tutorial, we saw how to return the sum of values in each partitioned window using the sum() function and the average values in each partitioned window using the avg() function. We added the result to the existing DataFrame as a new column. It is also possible to get the average similar to the avg() function using mean(). The things import mean from pyspark.sql.functions module and simply make sure that you have to import sum and average from the same module.