Apache Spark

Iterate Over Rows And Columns In The PySpark DataFrame

In Python, PySpark is a Spark module used to provide a similar kind of processing like spark using DataFrame.

We can traverse the PySpark DataFrame through rows and columns using:

  1. collect()
  2. select()
  3. iterrows()

Before moving to the these, we will create PySpark DataFrame.

Example:

Here, we are going to create PySpark dataframe with 5 rows and 6 columns.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display the dataframe
df.show()

Output:

collect()

This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the iterator to iterate columns. Finally, it will display the rows according to the specified columns.

Syntax:

for row_iterator in dataframe.collect():
  print(row_iterator[‘column’],…….)

Where,

  1. dataframe is the input PySpark DataFrame.
  2. column is the column name in the PySpark DataFrame.
  3. row_iterator is the iterator variable used to iterate row values in the specified column.

Example 1:

In this example, we are iterating rows from the rollno, height and address columns from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

# iterate over rollno,height and address columns
for row_iterator in df.collect():
   print(row_iterator['rollno'],row_iterator['height'],row_iterator['address'])

Output:

001 5.79 guntur
002 3.79 hyd
003 2.79 patna
004 3.69 hyd
005 5.59 hyd

Example 2:

In this example, we are iterating rows from name column from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

# iterate over name column
for row_iterator in df.collect():
   print(row_iterator['name'])

Output:

sravan
ojaswi
gnanesh chowdary
rohith
sridevi

select()

This method is used to iterate the columns in the given PySpark DataFrame. It can be used with collect() method and takes column. Finally, it will display the rows according to the specified columns.

Syntax:

dataframe.select(“column”,……………).collect()

Where,

  1. dataframe is the input PySpark DataFrame.
  2. column is the column name in the PySpark DataFrame.

Example 1:

In this example, we are iterating rows from the rollno and name column from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#iterate rollno and name columns
df.select("rollno", "name").collect()

Output:

[Row(rollno='001', name='sravan'),
Row(rollno='002', name='ojaswi'),
Row(rollno='003', name='gnanesh chowdary'),
Row(rollno='004', name='rohith'),
Row(rollno='005', name='sridevi')]

Example 2:

In this example, we are iterating rows from the rollno and weight column from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#iterate rollno and weight columns
df.select("rollno", "weight").collect()

Output:

[Row(rollno='001', weight=67),
Row(rollno='002', weight=34),
Row(rollno='003', weight=17),
Row(rollno='004', weight=28),
Row(rollno='005', weight=54)]

iterrows()

This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the row iterator and index to iterate columns. Finally, it will display the rows according to the specified indices. Before that, we have to convert into Pandas using toPandas() method.

Syntax:

for index, row_iterator in dataframe.toPandas().iterrows():
  print(row_iterator[index_value], ………)

Where,

  1. dataframe is the input PySpark DataFrame.
  2. index_value is the column index position in the PySpark DataFrame.
  3. row_iterator is the iterator variable used to iterate row values in the specified column.

Example 1:

In this example, we are iterating rows from the address and height columns from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#iterate address and height columns
for index, row_iterator in df.toPandas().iterrows():
   print(row_iterator[0], row_iterator[1])

Output:

guntur 23
hyd 16
patna 7
hyd 9
hyd 37

Example 2:

In this example, we are iterating rows from the address and name columns from the above PySpark DataFrame.

#import the pyspark module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17, 'address':'patna'},
{'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#iterate address and name columns
for index, row_iterator in df.toPandas().iterrows():
   print(row_iterator[0], row_iterator[3])

Output:

guntur sravan
hyd ojaswi
patna gnanesh chowdary
hyd rohith
hyd sridevi

Conclusion

In this tutorial, we discussed how to iterate over rows and columns in the PySpark DataFrame. We discussed three methods – select(), collect() and iterrows() with for loop. So, by using these methods we can specify the columns to be iterated through row iterator.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain