Apache Spark

PySpark shuffle() & sort_array()

In Python, PySpark is a Spark module used to provide a similar kind of Processing like spark using DataFrame.

In this article we will explain the methods in PySpark called shuffle() and sort_array to work with the data inside arrays.

For this example we also need to show the methods StructType() and StructField() methods which are used to define the columns in the PySpark DataFrame. By using these methods, we can define the column names and the data types of the particular columns. As well as the method ArrayType, to create an array structure.

Let’s discuss them one by one

StructType()

This method is used to define the structure of the PySpark dataframe. It will accept a list of data types along with column names for the given dataframe. This is known as the schema of the dataframe. It stores a collection of fields

StructField()

This method is used inside the StructType() method of the PySpark dataframe. It will accept column names with the data type.

ArrayType()

This method is used to define the array structure of the PySpark dataframe. It will accept a list of data types. It stores a collection of fields. We can place datatypes inside ArrayType().

So In this article, we have to create a dataframe with an array. Let’s create a dataframe with 2 columns. First column is Student_category that refers to the integer field to store student id’s and the second column – Student_full_name

is used to store string values in an array created using ArrayType().

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import array_contains

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']), (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category",
 IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

df.show()

Output:

shuffle()

shuffle() is used to shuffle the values in an array for all rows in the array type column of the pyspark DataFrame. It will return a new array with shuffled values. It takes the array type column name as a parameter. Please note that it shuffles randomly.

Syntax:

shuffle(array_column)

Parameter:

array_column is the array column that has arrays with values

shuffle() function is used with the select() method to do the action.

Return:

It will return the array with shuffled values.

Example

In this example, we will use the shuffle() function to shuffle the values present in the Student_full_name column.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']), (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

#shuffle the array values in Student_full_name column
df.select("Student_full_name",shuffle("Student_full_name")).show()

Output:

You can see in the second column that values in the array were shuffled.

sort_array()

sort_array() is used to sort the elements in an array. It takes two parameters; the first parameter is the array type column name, and the second parameter is the asc that takes boolean values. If it is True, then the array is sorted in ascending order. If it is False, the array is sorted in descending order. By default, it will sort the array in ascending order.

Syntax:

sort_array(array_column,asc=True/False)

Parameters:

  1. array_column is the array column that has arrays with values
  2. asc takes boolean values to sort the array in ascending or in descending order.

sort_array() function is used with the select() method to do the action.

Return:

If returns a sorted array for all rows in an array type column.

Example 1

In this example, we will sort the Student_full_name column in ascending order.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']),
 (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

#sort the array values in Student_full_name column in ascending order
df.select("Student_full_name",sort_array("Student_full_name",asc=True)).show()

Output:

So in the second column, you can see that the array is sorted in ascending order for all rows.

Example 2

In this example, we will sort the Student_full_name column in descending order.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']),
  (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

#sort the array values in Student_full_name column in descending order
df.select("Student_full_name",sort_array("Student_full_name",asc=False)).show()

Output:

So in the second column, you can see that the array is sorted in descending order for all rows.

Conclusion

In this PySpark article, we saw two different array functions. shuffle() is used to shuffle the values in an array type column. Next we discussed the sort_array() method that is used to sort the arrays in an array type column. If you want to sort the array in descending order, you can use the asc parameter set to False.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain