Merging the data in PySpark can be done using the union() and unionAll() functions. But if the column data types are different in the two PySpark DataFrames, these will not work. If you want to merge two DataFrames even if the columns are different, using the unionByName() function is the best option. Let’s see how to handle the mismatched columns while using this function in this tutorial by considering the allowMissingColumns parameter.
Topic of Contents:
Pyspark.sql.DataFrame.unionByName()
The unionByName() function is used to join the DataFrames based on the column label. This will not join the columns by position. It considers the column label. Basically, it takes two parameters.
The first parameter is the other Pyspark DataFrame that is joined with the first DataFrame. The second parameter which is the allowMissingColumns is used to handle the differed columns. By default, it is false. In this case, if the column names are not the same, the “AnalysisException” is raised. If it is set to true, null is placed in case of mismatched column cells. Hence, it is better to include this parameter while using this function.
Syntax:
Let’s see the syntax of unionByName():
First, install the PySpark module using the pip command.
UnionByName()
In this scenario, we join two PySpark DataFrames, “first_df_obj” and “second_df_obj”, with the same columns (labels are also the same).
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()
first_df =[["Education","Hot","Company-ABC"],
["Agriculture","Cool","Company-ABC"],
["Internet","Warm","Company-XYZ"],
["Banking","Warm","Company-PQR"],
["Government","Cool","Company-ABC"]]
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])
second_df =[["Warm","Company-ABC","Banking"],
["Cool","Company-ABC","Internet"],
["Hot","Company-XYZ","Government"]]
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Sector"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Sector"])
print("FIRST DF: ")
# First DataFrame
first_df_obj.show()
print("SECOND DF: ")
# Second DataFrame
second_df_obj.show()
print("UNION BY COLUMN NAME: ")
# Join both the DataFrames and display
first_df_obj.unionByName(second_df_obj).show()
Output:
Here, the columns in both the DataFrames are “Sector”, “Rating”, and “Account_Name” as the columns are equal. Both the DataFrames are joined and the joined DataFrame holds 8 records (the first 5 are from the first DataFrame and the last 3 are from the second DataFrame).
Error Scenario:
As we discussed, if the column labels are not the same, the AnalysisException is returned. Let’s create two DataFrames such that the first DataFrame holds the “Sector”, “Rating”, and “Account_Name” columns and the second DataFrame holds the “Rating”, “Account_Name”, and “Pay” columns.
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()
first_df =[["Education","Hot","Company-ABC"],
["Agriculture","Cool","Company-ABC"],
["Internet","Warm","Company-XYZ"],
["Banking","Warm","Company-PQR"],
["Government","Cool","Company-ABC"]]
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])
second_df =[["Warm","Company-ABC",200],
["Cool","Company-ABC",100],
["Hot","Company-XYZ",500]]
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Pay"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Pay"])
first_df_obj.unionByName(second_df_obj).show()
Error:
UnionByName() with the AllowMissingColumns Parameter
As we discussed earlier, if the columns are not the same and we set this parameter to true, it allows the missing columns but the values are set to null.
Consider the DataFrames that are used in the error scenario and specify this parameter:
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()
first_df =[["Education","Hot","Company-ABC"],
["Agriculture","Cool","Company-ABC"],
["Internet","Warm","Company-XYZ"],
["Banking","Warm","Company-PQR"],
["Government","Cool","Company-ABC"]]
# create the dataframe using the first_df with columns - ["Sector", "Rating", "Account_Name"]
first_df_obj = linuxhint_spark_app.createDataFrame(first_df,["Sector", "Rating", "Account_Name"])
second_df =[["Warm","Company-ABC",200],
["Cool","Company-ABC",100],
["Hot","Company-XYZ",500]]
# create the dataframe using the second_df with columns - ["Rating", "Account_Name", "Pay"]
second_df_obj = linuxhint_spark_app.createDataFrame(second_df,["Rating", "Account_Name", "Pay"])
print("FIRST DF: ")
# First DataFrame
first_df_obj.show()
print("SECOND DF: ")
# Second DataFrame
second_df_obj.show()
print("UNION BY COLUMN NAME: ")
# With allowMissingColumns parameter
first_df_obj.unionByName(second_df_obj,allowMissingColumns=True).show()
Output:
The “Pay” column exists in the second DataFrame but not in the first DataFrame. So, null is placed for all rows in the first DataFrame. Similarly, the “Sector” column exists in the first DataFrame but not in the second DataFrame. So, null is placed for all rows in the second DataFrame.
Conclusion
The unionByName() function is used to join the DataFrames based on the column label. This will not join the columns by position. It considers the column label. The specialty of this function is the allowMissingColumns parameter which is used to handle the differed columns which is set to True. Then, null is placed in case of mismatched column cells. Hence, it is better to include this parameter while using this function. We demonstrated the error for better understanding and also learned how to resolve it by passing this parameter.