Apache Spark

PySpark Read.Parquet()

In PySpark, the write.parquet() function writes the DataFrame to the parquet file and the read.parquet() reads the parquet file to the PySpark DataFrame or any other DataSource. To process the columns in Apache Spark quickly and efficiently, we need to compress the data. Data compression saves our memory and all the columns are converted into flat level. That means that the flat column level storage exists. The file which stores these is known as the PARQUET file.

In this guide, we will mainly focus on reading/loading the parquet file into the PySpark DataFrame/SQL using the read.parquet() function which is available in the pyspark.sql.DataFrameReader class.

Topic of Contents:

Get the Parquet File

Read the Parquet File to the PySpark DataFrame

Read the Parquet File to the PySpark SQL

Pyspark.sql.DataFrameReader.parquet()

This function is used to read the parquet file and load it into the PySpark DataFrame. It takes the path/file name of the parquet file. We can simply use the read.parquet() function since this is the generic function.

Syntax:

Let’s see the syntax of read.parquet():

spark_app.read.parquet(file_name.parquet/path)

First, install the PySpark module using the pip command:

pip install pyspark

Get the Parquet File

To read a parquet file, you need the data in which the parquet file is generated from that data. In this part, we will see how to generate a parquet file from the PySpark DataFrame.

Let’s create a PySpark DataFrame with 5 records and write this to the “industry_parquet” parquet file.

import pyspark

from pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# create the dataframe that store Industry details

industry_df = linuxhint_spark_app.createDataFrame([Row(Type='Agriculture',Area='USA',
Rating='Hot',Total_employees=100),

Row(Type='Agriculture',Area='India',Rating='Hot',Total_employees=200),

Row(Type='Development',Area='USA',Rating='Warm',Total_employees=100),

Row(Type='Education',Area='USA',Rating='Cool',Total_employees=400),

Row(Type='Education',Area='USA',Rating='Warm',Total_employees=20)

])

# Actual DataFrame

industry_df.show()

# Write the industry_df to the parquet file

industry_df.coalesce(1).write.parquet("industry_parquet")

Output:

This is the DataFrame that holds 5 records.

A parquet file is created for the previous DataFrame. Here, our file name with an extension is “part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. We use this file in the entire tutorial.

Read the Parquet File to the PySpark DataFrame

We have the parquet file. Let’s read this file using the read.parquet() function and load it into the PySpark DataFrame.

import pyspark

from pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# Read the parquet file into dataframe_from_parquet object.

dataframe_from_parquet=linuxhint_spark_app.read.parquet("part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet")

# Display the dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Output:

We display the DataFrame using the show() method which was created from the parquet file.

SQL Queries with Parquet File

After loading into the DataFrame, it can be possible to create the SQL tables and display the data that is present in the DataFrame. We need to create a TEMPORARY VIEW and use the SQL commands to return the records from the DataFrame which is created from the parquet file.

Example 1:

Create a temporary view named “Sectors” and use the SELECT command to display the records in the DataFrame. You can refer to this tutorial that explains how to create a VIEW in Spark – SQL.

import pyspark

from pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# Read the parquet file into dataframe_from_parquet object.

dataframe_from_parquet=linuxhint_spark_app.read.parquet("part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet")

# Create View from the above parquet file named - "Sectors"

dataframe_from_parquet.createOrReplaceTempView("Sectors")

# Query to display all records from the Sectors

linuxhint_spark_app.sql("select * from Sectors").show()

Output:

Example 2:

Using the previous VIEW, write the SQL query:

  1. To display all records from the Sectors that belong to “India”.
  2. To display all records from the Sectors with an employee that is greater than 100.
# Query to display all records from the Sectors belonging to "India".

linuxhint_spark_app.sql("select * from Sectors where Area='India'").show()

# Query to display all records from the Sectors with employee greater than 100

linuxhint_spark_app.sql("select * from Sectors where Total_employees>100").show()

Output:

There is only one record with area which is “India” and two records with employees that is greater than 100.

Read the Parquet File to the PySpark SQL

First, we need to create a VIEW using the CREATE command. Using the “path” keyword within the SQL query, we can read the parquet file to the Spark SQL. After the path, we need to specify the filename/location of the file.

Syntax:

spark_app.sql("CREATE TEMPORARY VIEW view_name USING parquet OPTIONS (path "file_name.parquet")")

Example 1:

Create a temporary view named “Sector2” and read the parquet file into it. Using the sql() function, write the select query to display all records that are present in the view.

import pyspark

from pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# Read the parquet file into Spark- SQL

linuxhint_spark_app.sql("CREATE TEMPORARY VIEW Sector2 USING parquet OPTIONS (path "part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet")")

# Query to display all records from the Sector2

linuxhint_spark_app.sql("select * from Sector2").show()

Output:

Example 2:

Use the previous VIEW and write the query to display all records with the rating of “Hot” or “Cool”.

# Query to display all records from the Sector2 with Rating- Hot or Cool.

linuxhint_spark_app.sql("select * from Sector2 where Rating='Hot' OR Rating='Cool'").show()

Output:

There are three records with the rating of “Hot” or “Cool”.

Conclusion

In PySpark, the write.parquet() function writes the DataFrame to the parquet file. The read.parquet() function reads the parquet file to the PySpark DataFrame or any other DataSource. We learned how to read the parquet file into the PySpark DataFrame and into the PySpark table. As part of this tutorial, we also discussed how to create the tables from the PySpark DataFrame and filter the data using the WHERE clause.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain