Apache Spark

PySpark Pandas_Udf()

Transforming the PySpark DataFrame is possible using the pandas_udf() function. It is a user defined function which is applied on the PySpark DataFrame with arrow. We can perform the vectorized operations using the pandas_udf(). It can be implemented by passing this function as a decorator. Let’s dive into this guide to know the syntax, parameters, and different examples.

Topic of Contents:

If you want to know about the PySpark DataFrame and module installation, go through this article.

Pyspark.sql.functions.pandas_udf()

The pandas_udf () is available in the sql.functions module in PySpark which can be imported using the “from” keyword. It is used to perform the vectorized operations on our PySpark DataFrame. This function is implemented like a decorator by passing three parameters. After that, we can create a user-defined function that returns the data in the vector format (like we use series/NumPy for this) using an arrow. Within this function, we are able to return the result.

Structure & Syntax:

First, let’s look at the structure and syntax of this function:

@pandas_udf(datatype)
def function_name(operation) -> convert_format:
return statement

Here, the function_name is the name of our defined function. The data type specifies the data type that is returned by this function. We can return the result using the “return” keyword. All the operations are performed inside the function with the arrow assignment.

Pandas_udf (Function and ReturnType)

  1. The first parameter is the user-defined function that is passed to it.
  2. The second parameter is used to specify the return data type from the function.

Data:

In this entire guide, we use only one PySpark DataFrame for demonstration. All the user-defined functions that we define are applied on this PySpark DataFrame. Make sure that you create this DataFrame in your environment first after the installation of PySpark.

import pyspark

from pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

from pyspark.sql.functions import pandas_udf

from pyspark.sql.types import *

import pandas as panda

# vegetable details

vegetable =[{'type':'vegetable','name':'tomato','locate_country':'USA','quantity':800},

{'type':'fruit','name':'banana','locate_country':'CHINA','quantity':20},

{'type':'vegetable','name':'tomato','locate_country':'USA','quantity':800},

{'type':'vegetable','name':'Mango','locate_country':'JAPAN','quantity':0},

{'type':'fruit','name':'lemon','locate_country':'INDIA','quantity':1700},

{'type':'vegetable','name':'tomato','locate_country':'USA','quantity':1200},

{'type':'vegetable','name':'Mango','locate_country':'JAPAN','quantity':0},

{'type':'fruit','name':'lemon','locate_country':'INDIA','quantity':0}

]

# create the market dataframe from the above data

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Output:

Here, we create this DataFrame with 4 columns and 8 rows. Now, we use the pandas_udf() to create the user-defined functions and apply them to these columns.

Pandas_udf() with Different Data Types

In this scenario, we create some user-defined functions with pandas_udf() and apply them on columns and display the results using the select() method. In each case, we use the pandas.Series as we perform the vectorized operations. This considers the column values as a one-dimensional array and the operation is applied on the column. In the decorator itself, we specify the function return type.

Example 1: Pandas_udf() with String Type

Here, we create two user-defined functions with the string return type to convert the string type column values to uppercase and lowercase. Finally, we apply these functions on “type” and “locate_country” columns.

# Convert type column to upper case with pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Convert locate_country column to lowercase with pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Display the columns using select()

market_df.select("type",type_upper_case("type"),"locate_country",
country_lower_case("locate_country")).show()

Output:

Explanation:

The StringType() function is available in the pyspark.sql.types module. We already imported this module while creating the PySpark DataFrame.

  1. First, UDF (user-defined function) returns the strings in uppercase using the str.upper() function. The str.upper() is available in the Series Data Structure (as we are converting to series with an arrow inside the function) which converts the given string to uppercase. Finally, this function is applied to the “type” column which is specified inside the select() method. Previously, all the strings in the type column are in lowercase. Now, they are changed to uppercase.
  2. Second, UDF returns the strings in uppercase using the str.lower()function. The str.lower() is available in the Series Data Structure which converts the given string to lowercase. Finally, this function is applied to the “type” column which is specified inside the select() method. Previously, all the strings in the type column are in uppercase. Now, they are changed to lowercase.

Example 2: Pandas_udf() with Integer Type

Let’s create a UDF that converts the PySpark DataFrame integer column to the Pandas series and add 100 to each value. Pass the “quantity” column to this function inside the select() method.

# Add 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

return i+100

# Pass the quantity column to the above function and display.

market_df.select("quantity",add_100("quantity")).show()

Output:

Explanation:

Inside the UDF, we iterate all the values and convert them to Series. After that, we add 100 to each value in the Series. Finally, we pass the “quantity” column to this function and we can see that 100 is added to all the values.

Pandas_udf() with Different Data Types Using Groupby() & Agg()

Let’s look at the examples to pass the UDF to the aggregated columns. Here, the column values are grouped first using the groupby() function and aggregation is done using the agg() function. We pass our UDF inside this aggregate function.

Syntax:

pyspark_dataframe_object.groupby("grouping_column").agg(UDF
(pyspark_dataframe_object['column']))

Here, the values in the grouping column are grouped first. Then, the aggregation is done on each grouped data with respect to our UDF.

Example 1: Pandas_udf() with Aggregate Mean()

Here, we create a user-defined function with a return type float. Inside the function, we calculate the average using the mean() function. This UDF is passed to the “quantity” column to get the average quantity for each type.

# return the mean/average

@pandas_udf("float")

def average_function(i: panda.Series) -> float:

return i.mean()

# Pass the quantity column to the function by grouping the type column.

market_df.groupby("type").agg(average_function(market_df['quantity'])).show()

Output:

We are grouping based on elements in the “type” column. Two groups are formed – “fruit” and “vegetable”. For each group, the mean is calculated and returned.

Example 2: Pandas_udf() with Aggregate Max() and Min()

Here, we create two user-defined functions with the integer (int) return type. The first UDF returns the minimum value and the second UDF returns the maximum value.

# pandas_udf that return the minimum value

@pandas_udf("int")

def min_(i: panda.Series) -> int:

return i.min()

# pandas_udf that return the maximum value

@pandas_udf("int")

def max_(i: panda.Series) -> int:

return i.max()

# Pass the quantity column to the min_ pandas_udf by grouping locate_country.

market_df.groupby("locate_country").agg(min_(market_df['quantity'])).show()

# Pass the quantity column to the max_ pandas_udf by grouping locate_country.

market_df.groupby("locate_country").agg(max_(market_df['quantity'])).show()

Output:

To return minimum and maximum values, we utilize the min() and max() functions in the return type of UDFs. Now, we group the data in the “locate_country” column. Four groups are formed (“CHINA”, “INDIA”, “JAPAN”, “USA”). For each group, we return the maximum quantity. Similarly, we return the minimum quantity.

Conclusion

Basically, the pandas_udf () is used to perform the vectorized operations on our PySpark DataFrame. We have seen how to create the pandas_udf() and apply it to the PySpark DataFrame. For better understanding, we discussed the different examples by considering all the datatypes (string, float, and integer). It can be possible to use the pandas_udf() with groupby() through the agg() function.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain