Apache Spark

PySpark UnionByName()

Merging the data in PySpark can be done using the union() and unionAll() functions. But if the column data types are different in the two PySpark DataFrames, these will not work. If you want to merge two DataFrames even if the columns are different, using the unionByName() function is the best option. Let’s see how to handle the mismatched columns while using this function in this tutorial by considering the allowMissingColumns parameter.

Topic of Contents:

    1. UnionByName()
    2. UnionByName() with the AllowMissingColumns Parameter

Pyspark.sql.DataFrame.unionByName()

The unionByName() function is used to join the DataFrames based on the column label. This will not join the columns by position. It considers the column label. Basically, it takes two parameters.

The first parameter is the other Pyspark DataFrame that is joined with the first DataFrame. The second parameter which is the allowMissingColumns is used to handle the differed columns. By default, it is false. In this case, if the column names are not the same, the “AnalysisException” is raised. If it is set to true, null is placed in case of mismatched column cells. Hence, it is better to include this parameter while using this function.

Syntax:

Let’s see the syntax of unionByName():

firstDataFrame_obj.read.parquet(secondDataFrame_obj,allowMissingColumns)

 
First, install the PySpark module using the pip command.

pip install pyspark

 

UnionByName()

In this scenario, we join two PySpark DataFrames, “first_df_obj” and “second_df_obj”, with the same columns (labels are also the same).

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

first_df =[["Education","Hot","Company-ABC"],
           ["Agriculture","Cool","Company-ABC"],
           ["Internet","Warm","Company-XYZ"],
           ["Banking","Warm","Company-PQR"],
           ["Government","Cool","Company-ABC"]]
           
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])

second_df =[["Warm","Company-ABC","Banking"],
           ["Cool","Company-ABC","Internet"],
           ["Hot","Company-XYZ","Government"]]
           
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Sector"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Sector"])

print("FIRST DF: ")
# First DataFrame
first_df_obj.show()

print("SECOND DF: ")
# Second DataFrame
second_df_obj.show()

print("UNION BY COLUMN NAME: ")
# Join both the DataFrames and display
first_df_obj.unionByName(second_df_obj).show()

 
Output:


Here, the columns in both the DataFrames are “Sector”, “Rating”, and “Account_Name” as the columns are equal. Both the DataFrames are joined and the joined DataFrame holds 8 records (the first 5 are from the first DataFrame and the last 3 are from the second DataFrame).

Error Scenario:

As we discussed, if the column labels are not the same, the  AnalysisException is returned. Let’s create two DataFrames such that the first DataFrame holds the “Sector”, “Rating”, and “Account_Name” columns and the second DataFrame holds the “Rating”, “Account_Name”, and “Pay” columns.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

first_df =[["Education","Hot","Company-ABC"],
           ["Agriculture","Cool","Company-ABC"],
           ["Internet","Warm","Company-XYZ"],
           ["Banking","Warm","Company-PQR"],
           ["Government","Cool","Company-ABC"]]
           
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])

second_df =[["Warm","Company-ABC",200],
           ["Cool","Company-ABC",100],
           ["Hot","Company-XYZ",500]]
           
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Pay"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Pay"])

first_df_obj.unionByName(second_df_obj).show()

 
Error:

UnionByName() with the AllowMissingColumns Parameter

As we discussed earlier, if the columns are not the same and we set this parameter to true, it allows the missing columns but the values are set to null.

Consider the DataFrames that are used in the error scenario and specify this parameter:

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

first_df =[["Education","Hot","Company-ABC"],
           ["Agriculture","Cool","Company-ABC"],
           ["Internet","Warm","Company-XYZ"],
           ["Banking","Warm","Company-PQR"],
           ["Government","Cool","Company-ABC"]]
           
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])

second_df =[["Warm","Company-ABC",200],
           ["Cool","Company-ABC",100],
           ["Hot","Company-XYZ",500]]
           
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Pay"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Pay"])

print("FIRST DF: ")
# First DataFrame
first_df_obj.show()

print("SECOND DF: ")
# Second DataFrame
second_df_obj.show()

print("UNION BY COLUMN NAME: ")

# With allowMissingColumns parameter
first_df_obj.unionByName(second_df_obj,allowMissingColumns=True).show()

 
Output:


The “Pay” column exists in the second DataFrame but not in the first DataFrame. So, null is placed for all rows in the first DataFrame. Similarly, the “Sector” column exists in the first DataFrame but not in the second DataFrame. So, null is placed for all rows in the second DataFrame.

Conclusion

The unionByName() function is used to join the DataFrames based on the column label. This will not join the columns by position. It considers the column label. The specialty of this function is the allowMissingColumns  parameter which is used to handle the differed columns which is set to True. Then, null is placed in case of mismatched column cells. Hence, it is better to include this parameter while using this function. We demonstrated the error for better understanding and also learned how to resolve it by passing this parameter.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain