Apache Spark

PySpark – collect()

In Python, PySpark is a Spark module used to provide a similar kind of Processing like spark using DataFrame.

collect() method in PySpark displays the data present in dataframe row by row.

Syntax:

Dataframe.collect()

Example:

In this example, we will create a PySpark DataFrame with 6 columns and 5 rows and display the dataframe in a tabular format using the show() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

# dataframe
df.show()

Output:

Capture.PNG

Example 2:

Let’s display the dataframe using the collect() method

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

# Display
df.collect()

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67),

Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34),

Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17),

Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28),

Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

We can also use for loop with collect() method to iterate row by row

Syntax:

for iterator in dataframe.collect():

print(iterator)

To display particular columns, we have to specify the column name with iterator using [] operator

Syntax:

for iterator in dataframe.collect():

print(iterator[‘column1’], iterator[‘column2’],…….)

Example:

This example will iterate multiple columns in a dataframe with the collect() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

# Display name column with for loop
for i in df.collect():
    print(i['name'])

print("------------------------------------")

# Display height and weight column with for loop
for i in df.collect():
    print(i['name'],i['weight'])

print("------------------------------------")

# Display all column with for loop
for i in df.collect():
    print(i)

Output:

sravan

ojaswi

gnanesh chowdary

rohith

sridevi

------------------------------------

sravan 67

ojaswi 34

gnanesh chowdary 17

rohith 28

sridevi 54

------------------------------------

Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)

Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34)

Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)

Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28)

Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)

We can also get the particular Row using the collect() method using an index.

In PySpark DataFrame, indexing starts from 0.

Syntax:

Dataframe.collect()[row_index]

Example:

In this example, we are collecting first, second and fifth rows.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display first row
print(df.collect()[0])

#display second row
print(df.collect()[1])

#display fifth row
print(df.collect()[4])

Output:

Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)

Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34)

Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)

We can also get the particular column in a Row using the collect() method using an index.

In PySpark DataFrame, indexing starts from 0.

Syntax:

Dataframe.collect()[row_index][column_index]

Example:

In this example, we are going to get values from the first row – first column, third row – first column

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display first row first column
print(df.collect()[0][0])

#display second row first column
print(df.collect()[2][0])

Output:

guntur

patna

Conclusion

In this article, we covered the usage of the collect() method with different scenarios. Finally, we understand that the collect() method is used to get the data Row by Row in the PySpark DataFrame.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain