Apache Spark

PySpark posexplode() and posexplode_outer()

In this article we will demonstrate PySpark’s posexplode() and posexplode_outer() methods for getting data items out from inside an array for individual processing.

In order to teach these methods let’s ensure we know about some other prerequisites which are the StructType() and StructField() methods which are used to define the columns in the PySpark DataFrame.
By using these methods, we can define the column names and the data types of the particular columns. And also the ArrayType method.

StructType()

This method is used to define the structure of the PySpark data frame. It will accept a list of data types along with column names for the given data frame. This is known as the schema of the data frame. It stores a collection of fields

StructField()

This method is used inside the StructType() method of the PySpark dataframe. It will accept column names with the data type.

ArrayType()

This method is used to define the array structure of the PySpark data frame. It will accept a list of data types. It stores a collection of fields. We can place datatypes inside ArrayType().

In this article, we have to create a dataframe with an array. Let’s create a dataframe with 2 columns. First column is Student_category which refers to the integer field to store student id’s and the second column – Student_full_name is used to store string values in an array created using ArrayType(). And after this we will demonstrate the methods for posexplode() and posexplode_outer().

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import array_contains

#create an app named linuxhint
spark_app = SparkSession.builder.appName(‘linuxhint’).getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, [‘A’]), (2, [‘B’,’L’,’B’]), (3, [‘K’,’A’,’K’]),(4, [‘K’]), (3, [‘B’,’P’])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField(“Student_category”, IntegerType()),StructField(“Student_full_name”, ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

df.show()

Output:

posexplode()

Now, we will see what posexplode() does.

posexplode() will return each and every individual value from an array. If the array is empty or null, it will ignore and go to the next array in an array type column in PySpark DataFrame.

It returns two columns.

1st column contains the position(pos) of the value present in array column

2nd column contains the value(col) present in the array column

This is possible using the select() method. Inside this method, we can use the posexplode() function and return the result.

Syntax:

dataframe.select(posexplode(array_column))

Parameters:

array_column contains array type values

Return:

It will return all the values in an array in all rows in an array type column in a PySpark DataFrame into two columns. First column is the position(pos) of the value in the particular array and the second column contains the value(col).

Example 1:

In this example, we will return all the values in an array from the Student_full_name column.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']), (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

# apply posexplode on the Student_full_name column
df.select("Student_full_name",posexplode('Student_full_name')).show()

Output:

We can see that in the 2nd column, each position in the array from each row is returned and in the 3rd column value present in each array is returned.

Now, let’s see what if array values are empty.

Example 2:

This data frame has no values in the array.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, []), (2, []), (3, []),(4, []), (3, [])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

# apply posexplode on the the Student_full_name column
df.select("Student_full_name",posexplode('Student_full_name')).show()

Output:

It returns nothing in pos and col columns, since all the values are missing in the array column- Student_full_name.

posexplode_outer()

Now, we will see what posexplode_outer() does.

posexplode_outer() will return each and every individual value from an array(col) along with the position of value in the array(pos). If the array is empty or null, it returns null and goes to the next array in an array type column in PySpark DataFrame..

This is possible using the select() method. Inside this method, we can use the posexplode_outer() function and return the result.

Syntax:

dataframe.select(posexplode_outer(array_column))

Parameters:

array_column contains array type values

Return:

It will return all the values in an array in all rows in an array type column in a PySpark DataFrame.

The difference between posexplode() and posexplode_outer() is posexplode() won’t return anything when there are no values in the array. But posexplode_outer() return null when there are no values in the array.

Example 1:

In this example, we will return all the values in an array from the Student_full_name column.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, ['A']), (2, ['B','L','B']), (3, ['K','A','K']),(4, ['K']), (3, ['B','P'])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

# apply posexplode_outer on the Student_full_name column
df.select("Student_full_name",posexplode_outer('Student_full_name')).show()

Output:

We can see that in the 2nd column, each position in the array from each row is returned. In the 3rd column value present in each array is returned..

Now, let’s see what if array values are empty.

Example 2:

This data frame has no values in the array.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#and import struct types and other data types
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType
from pyspark.sql.functions import *

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# consider an array with 5 elements
my_array_data = [(1, []), (2, []), (3, []),(4, []), (3, [])]

#define the StructType and StructFields
#for the above data
schema = StructType([StructField("Student_category", IntegerType()),StructField("Student_full_name", ArrayType(StringType()))])

#create the dataframe and add schema to the dataframe
df = spark_app.createDataFrame(my_array_data, schema=schema)

# apply posexplode_outer the Student_full_name column
df.select("Student_full_name",posexplode_outer('Student_full_name')).show()

Output:

It returns null for both columns in all rows since all the values are missing in the array column – Student_full_name.

Conclusion

In this article, we discussed posxplode() and posexplode_outer() applied on array type column in the DataFrame with two different examples. posexplode() and posexplode_outer() will perform some action when all the values in the array are not null. If any value in an array is null, posexplode() will ignore that null value. But posexplode_outer() will consider and return null value wherever null value is present in the array.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain