Apache Spark

PySpark – Filter()

In Python, PySpark is a spark module used to provide a similar kind of processing like spark using DataFrame. In PySpark, filter() is used to filter the rows in the DataFrame.

It will return the new dataframe by filtering the rows in the existing dataframe.

Let’s create a PySpark DataFrame.

Example:

In this example, we are going to create the PySpark DataFrame with 5 rows and 6 columns and display using show() method.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

df.show()

Output:

Method – 1: filter() with Relational Operators

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc.

Syntax:

dataframe.filter(dataframe.column_name   operator   value)

Here, filter() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.

Example:

In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

#by filtering age as 23

print(df.filter(df.age == 23).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering height as 2.79

print(df.filter(df.height == 2.79).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering weight greater than 30

print(df.filter(df.weight >30).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering name as sravan

print(df.filter(df.name == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 2: filter() with Relational Operators Using Col Function

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc. through col function.

Syntax:

dataframe.filter(col(“column_name”)   operator   value)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.

Example:

In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

#by filtering age as 23

print(df.filter(col('age') == 23).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering height as 2.79

print(df.filter(col('height') == 2.79).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering weight greater than 30

print(df.filter(col('weight') >30).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering name as sravan

print(df.filter(col('name') == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 3: filter() with Relational Operators on Multiple Conditions

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc with multiple conditions.

Syntax:

dataframe.filter((conditin1) operator (condition2)……….)

Here, filter() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.
  4. condition specifies – (dataframe.column_name   operator   value).

Example:

In this example, we are going to filter the dataframe based on age column with or(|) , and (&) operator and display the filtered rows using the collect() method.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

#by filtering age greater than 10 and less than 21

print(df.filter((df.age > 10)&(df.age <21)).collect())

 

print("---------------------------")

 

#display dataframe

#by filtering age greater than 10 or less than 21

print(df.filter((df.age > 10)|(df.age <21)).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

Method – 4: filter() with SQL Expression

Here, we are using the filter() function to filter the PySpark DataFrame with SQL Expression. The condition is enclosed by “”. However, we can specify relational operator as a condition.

Syntax:

dataframe.filter(“dataframe.column_name   operator   value”)

Here, filter() accepts three parameters.
<ol>
    <li>dataframe.column_name is the column where filtering is applied.</li>
    <li>operator is the relational operator.</li>
    <li>value is the string/numeric value compared with column values.</li>
</ol>
<h2>Example:</h2>
In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

[cc lang="apache" width="100%" height="100%" escaped="true" theme="blackboard" nowrap="0"]#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

#by filtering age as 23

#using sql expression

print(df.filter("age == 23").collect())

 

print("---------------------------")

 

#display dataframe

#by filtering height as 2.79

#using sql expression

print(df.filter("height == 2.79").collect())

 

print("---------------------------")

 

#display dataframe

#by filtering weight greater than 30

#using sql expression

print(df.filter("weight >30").collect())

 

print("---------------------------")

 

#display dataframe

#by filtering name as sravan

#using sql expression

print(df.filter("name == 'sravan'").collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

---------------------------

[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

---------------------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 4: filter() with String  Methods

Here, we are using the filter() function to filter the PySpark DataFrame with string methods. These are applied on the columns whose datatype is string. They are:

startswith() – check the starting character in the given data

Syntax:

dataframe.filter(dataframe.column_name.startswith("character")

Here,

column_name is the column where filtering is applied where the value starts with the given character.

endswith() – check the ending  character in the given data.

Syntax:

dataframe.filter(dataframe.column_name.endswith("character")

Here,

column_name is the column where filtering is applied where the value ends with the given character.

contains() – check the character contains in the given data.

Syntax:

dataframe.filter(dataframe.column_name.contains("character")

Here,

column_name is the column where filtering is applied where the value contains with the given character.

Example:

In this example, we are going to filter based on address column.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

#import the col function

from pyspark.sql.functions import col

 

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

 

# create student data with 5 rows and 6 attributes

students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},

 {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},

 {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,
'address':'patna'},

 {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},

 {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

 

# create the dataframe

df = spark_app.createDataFrame( students)

 

#display dataframe

#by filtering address

#starts with 'h'

print(df.filter(df.address.startswith("h")).collect())

 

print("-----------------")

 

#display dataframe

#by filtering address

#ends with 'r'

print(df.filter(df.address.endswith("r")).collect())

 

print("-----------------")

 

#display dataframe

#by filtering address

#comntains  'tu'

print(df.filter(df.address.contains("r")).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

-----------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

-----------------

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Conclusion

In this article, we discussed how to use a filter() function with several methods on the PySpark dataframe in Python. And we included all the conditions with SQL expressions too.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain