Apache Spark

PySpark where Clause

In Python, PySpark is a Spark module used to provide a similar kind of Processing like spark using DataFrame.

In PySpark, where() is used to filter the rows in the DataFrame,

It will return the new dataframe by filtering the rows in the existing dataframe.

Let’s create a PySpark DataFrame

Example:

This example will create the PySpark DataFrame with 5 rows and 6 columns and display it using the show() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
df.show()

Output:

Capture.PNG

Method – 1 : where() with relational operators

Here , we are using where() function to filter the PySpark DataFrame with relational operators like >, < . == etc.

Syntax:

Dataframe.where(dataframe.column_name operator value)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied
  2. operator is the relational operator
  3. value is the string/numeric value compared with column values

Example:

In this example, we will filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
print(df.where(df.age == 23).collect())

print("---------------------------")

#display dataframe
#by filtering height as 2.79
print(df.where(df.height == 2.79).collect())

print("---------------------------")

#display dataframe
#by filtering weight greater than 30
print(df.where(df.weight >30).collect())

print("---------------------------")

#display dataframe
#by filtering name as sravan
print(df.where(df.name == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 2 : where() with relational operators using col function

Here , we are using where() function to filter the PySpark DataFrame with relational operators like >, < . == etc. through col function

Syntax:

Dataframe.where(col(“column_name”) operator value)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied
  2. operator is the relational operator
  3. value is the string/numeric value compared with column values

Example:

In this example, we will filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
print(df.where(col('age') == 23).collect())

print("---------------------------")

#display dataframe
#by filtering height as 2.79
print(df.where(col('height') == 2.79).collect())

print("---------------------------")

#display dataframe
#by filtering weight greater than 30
print(df.where(col('weight') >30).collect())

print("---------------------------")

#display dataframe
#by filtering name as sravan
print(df.where(col('name') == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 3 : where() with relational operators on multiple conditions

Here , we are using where() function to filter the PySpark DataFrame with relational operators like >, < . == etc with multiple conditions.

Syntax:

Dataframe.where((conditin1) operator (condition2)……….)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied
  2. operator is the relational operator
  3. value is the string/numeric value compared with column values
  4. condition specifies – ( dataframe.column_name operator value)

Example:

In this example, we are going to filter the dataframe based on the age column with or(|) , and (&) operator and display the filtered rows using the collect() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age greater than 10 and less than 21
print(df.where((df.age > 10)&(df.age  10)|(df.age <21)).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

Method – 4 : where() with SQL Expression

Here, we use the where() function to filter the PySpark DataFrame with SQL Expression. The condition is enclosed by “”. However, we can specify relational operator as a condition.

Syntax:

Dataframe.where(“dataframe.column_name operator value”)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied
  2. operator is the relational operator
  3. value is the string/numeric value compared with column values

Example:

In this example, we will filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
#using sql expression
print(df.where("age == 23").collect())

print("---------------------------")

#display dataframe
#by filtering height as 2.79
#using sql expression
print(df.where("height == 2.79").collect())

print("---------------------------")

#display dataframe
#by filtering weight greater than 30
#using sql expression
print(df.where("weight >30").collect())

print("---------------------------")

#display dataframe
#by filtering name as sravan
#using sql expression
print(df.where("name == 'sravan'").collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 5 : where() with string methods

Here, we are using the where() function to filter the PySpark DataFrame with string methods. These are applied on the columns whose datatype is a string. They are

startswith() – check the starting character in the given data

Syntax:

Dataframe.where(dataframe.column_name.startswith("character")

Here,

column_name is the column where filtering is applied where the value starts with the given character.

endswith() – check the ending character in the given data

Syntax:

Dataframe.where(dataframe.column_name.endswith("character")

Here,

column_name is the column where filtering is applied where the value ends with the given character.

contains() – check the character contains in the given data

Syntax:

Dataframe.where(dataframe.column_name.contains("character")

Here,

column_name is the column where filtering is applied where the value contains with the given character.

Example:

In this example, we are going to filter based on address column

#import the pyspaprk module
import pyspark
#import SparkSession for creating a session
from pyspark.sql import SparkSession
#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering address
#starts with 'h'
print(df.where(df.address.startswith("h")).collect())

print("-----------------")

#display dataframe
#by filtering address
#ends with 'r'
print(df.where(df.address.endswith("r")).collect())

print("-----------------")

#display dataframe
#by filtering address
#comntains  'tu'
print(df.where(df.address.contains("r")).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

-----------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

-----------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Conclusion

This article discusses how to use a where() filter with several methods on the PySpark dataframe in Python. And we included all the conditions with SQL expressions too.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain