Apache Spark

PySpark Write CSV

In PySpark, it is possible to save/write your existing DataFrame to CSV. PySpark provides the pyspark.sql.DataFrameWriter.csv() function which is used to write the DataFrame to CSV. In this article, we will see how to write the DataFrame to CSV with different options like including/excluding the headers and delimiters. If you want to append the new rows or overwrite the existing data in the CSV, we can use the mode option along with the write function.

Topic of Contents:

    1. Write the PySpark DataFrame to CSV Using Write.Csv()
    2. Write the PySpark DataFrame to CSV Using Write.Format()
    3. Write the PySpark DataFrame to CSV Using Write.Option()
    4. Write the PySpark DataFrame to CSV Using Write.Options()
    5. Appending and Overwriting the Data

In this tutorial, we create the PySpark DataFrame and save it to CSV using the previously mentioned functions.

Syntax:

DataFrameWriter.csv(path/file_name, mode=None, sep=None, header=None,...)

 

    1. The path/file_name is the name of the CSV. If we specify the path, CSV is stored at the specified location.
    2. mode : It is used to overwrite/append the contents in the existing CSV file.
    3. sep: It is used to specify the character which separates the columns.
    4. header: It is used to write/exclude the column names in the CSV while saving.

Install the PySpark library in your environment if it is not yet installed.

pip install pyspark

 

Write the PySpark DataFrame to CSV Using Write.Csv()

The write.csv() takes the file name/path where we need to save the CSV file as a parameter.

Syntax:

dataframe_object.coalesce(1).write.csv("file_name")

 
Actually, the CSV is saved as partitions (more than one). In order to get rid of this, we merge all the partitioned CSV files into one. In this scenario, we use the coalesce() function. Now, we see only one CSV file with all rows from the PySpark DataFrame.

Example:

Consider the PySpark DataFrame with 4 records having 4 columns. Write this DataFrame to CSV with the file named “market_details”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# Actual market data
market_df.show()

# write.csv()
market_df.coalesce(1).write.csv("market_details")

 
Output:


Let’s check for the file:


Open the last file to see the records.

Write the PySpark DataFrame to CSV Using Write.Format()

Using this method, we can specify the file type, i.e. csv. The save() method is used to save the CSV file along with the format() method. The format() method takes the “csv” file format and save() takes the file name.

Syntax:

dataframe_object.coalesce(1).write.format("csv").save("file_name")

 
We also use the coalesce() function here.

Example:

Write the previous PySpark DataFrame to CSV with the file named “market_data1”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# Actual market data
market_df.show()

# write.format()
market_df.coalesce(1).write.format("csv").save("market_data1")

 
Output:


You can see 4 records were written to the CSV.

Write the PySpark DataFrame to CSV Using Write.Option()

We can specify the options like header, delimiter, etc. along with the write() method. The option() method is used along with the write() method to specify the option. Only one option is specified.

Syntax:

dataframe_object.coalesce(1).write.option(option_parameter,Value).csv("file_name")

 
An error is raised if we specify multiple option parameters.

Example 1: With Header

Write the previous PySpark DataFrame to CSV with the file named “market_withcolumns”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# write.option() with header
market_df.coalesce(1).write.option("header",True).csv("market_withcolumns")

 
CSV:

We specify “True” for the header. The column names are available in this CSV.


Example 2: With Delimiter

Write the previous PySpark DataFrame to CSV with the file named “market_1”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# write.option() with delimiter
market_df.coalesce(1).write.option("delimiter",',').csv("market_1")

 
CSV:

By default, the delimiter is comma (,).

Write the PySpark DataFrame to CSV Using Write.Options()

The write.option() does not allow us to specify more than one option parameter. But the write.options() allows it. We just specify the option parameters separated by comma.

Syntax:

dataframe_object.coalesce(1).write.options(option_parameter=Value,....).csv("file_name")

 
Example:  With the Header and Delimiter

Utilize the previous PySpark DataFrame and write to CSV with the file named “market_2”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# write.options() with delimiter and header
market_df.coalesce(1).write.options(header='True', delimiter=',').csv("market_2")

 
CSV:

Appending and Overwriting the Data

It can be possible to append the data to the existing CSV file using the write.format() method. Using the mode() option, we can overwrite or append the data to the CSV file.

Syntax:

dataframe_object.coalesce(1).write.format("csv").mode('append').save("file_name")
dataframe_object.coalesce(1).write.format("csv").mode(overwrite).save("file_name")

 

    1. ‘append’ – Append the data from the PySpark Dataframe to the existing CSV.
    2. ‘overwrite’ – Remove the existing data from the CSV and update the CSV with the new data.

Example 1: Append

Let’s take out the previous DataFrame and write it to the “market_1_data” CSV and create a new DataFrame named “market_df2” with 2 rows. Append this DataFrame to the previous CSV file.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# write market_df to csv
market_df.coalesce(1).write.options(header='True', delimiter=',').csv("market_1_data")

# market data with 2 rows and 4 columns
market2=[{'m_id':'mz-006','m_name':'ABC','m_city':'bihar','m_state':'bihar'},
         {'m_id':'mz-007','m_name':'XYZ','m_city':'florida','m_state':'usa'}
             ]

# create the market dataframe from the above market2 data
market_df2 = linuxhint_spark_app.createDataFrame(market2)

# append market_df2
market_df2.coalesce(1).write.format("csv").mode('append').save("market_1_data")

 
Output:

This is the first written CSV.


Two rows are appended after appending the “market_df2”.


Example 2: Overwrite

Let’s take out the previous DataFrame and write it to the “market_2_data” CSV and create a new DataFrame named “market_df2” with 2 rows. Overwrite the first CSV file with the records that are present in “market_df2”.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# market data with 4 rows and 4 columns
market =[{'m_id':'mz-001','m_name':'ABC','m_city':'delhi','m_state':'delhi'},
         {'m_id':'mz-002','m_name':'XYZ','m_city':'patna','m_state':'lucknow'},
         {'m_id':'mz-003','m_name':'PQR','m_city':'florida','m_state':'usa'},
         {'m_id':'mz-004','m_name':'ABC','m_city':'delhi','m_state':'lucknow'}
             ]

# create the market dataframe from the above data
market_df = linuxhint_spark_app.createDataFrame(market)

# write market_df to csv
market_df.coalesce(1).write.options(header='True', delimiter=',').csv("market_2_data")

# market data with 2 rows and 4 columns
market2=[{'m_id':'mz-006','m_name':'ABC','m_city':'bihar','m_state':'bihar'},
         {'m_id':'mz-007','m_name':'XYZ','m_city':'florida','m_state':'usa'}
             ]

# create the market dataframe from the above market2 data
market_df2 = linuxhint_spark_app.createDataFrame(market2)

# overwrite market_df with market_df2
market_df2.coalesce(1).write.format("csv").mode('overwrite').save("market_2_data")

 
Output:

This is the first written CSV.


This is the final CSV after overwriting the first records:


 

Conclusion

We learned the different ways to write the PySpark DataFrame to CSV with examples. The write.csv() and write.format() write/save the PySpark Dataframe to CSV directly by taking the filename/path. If you want to specify the options like header, delimiter, etc, you can go with the write.option()/write.options() methods. The difference among these two is that the write.option() allows you to specify one option at a time, whereas we can specify multiple options at a time in the write.options() method. Lastly, we have seen the “append” and “overwrite” modes by considering the same DataFrame for a clear understanding.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain