Apache Spark

PySpark – Filter()

In Python, PySpark is a spark module used to provide a similar kind of processing like spark using DataFrame. In PySpark, filter() is used to filter the rows in the DataFrame. It will return the new dataframe by filtering the rows in the existing dataframe. We will demonstrate four ways to use the filter() function in this article:

  1. Method – 1: filter() with Relational Operators
  2. Method – 2: filter() with Relational Operators Using Col Function
  3. Method – 3: filter() with Relational Operators on Multiple Conditions
  4. Method – 4: filter() with SQL Expression

Let’s first create a PySpark DataFrame that we can use for the examples.

Example:

In this example, we are going to create the PySpark DataFrame with 5 rows and 6 columns and display using show() method.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
df.show()

Output:

Method – 1: filter() with Relational Operators

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc.

Syntax:

dataframe.filter(dataframe.column_name   operator   value)

Here, filter() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.

Example:

In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
print(df.filter(df.age == 23).collect())
print("---------------------------")

#display dataframe
#by filtering height as 2.79
print(df.filter(df.height == 2.79).collect())
print("---------------------------")

#display dataframe
#by filtering weight greater than 30
print(df.filter(df.weight >30).collect())
print("---------------------------")

#display dataframe
#by filtering name as sravan
print(df.filter(df.name == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]
---------------------------
[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 2: filter() with Relational Operators Using Col Function

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc. through col function.

Syntax:

dataframe.filter(col(“column_name”)   operator   value)

Here, where() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.

Example:

In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
print(df.filter(col('age') == 23).collect())
print("---------------------------")

#display dataframe
#by filtering height as 2.79
print(df.filter(col('height') == 2.79).collect())
print("---------------------------")

#display dataframe
#by filtering weight greater than 30
print(df.filter(col('weight') >30).collect())
print("---------------------------")

#display dataframe
#by filtering name as sravan
print(df.filter(col('name') == 'sravan').collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]
---------------------------
[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 3: filter() with Relational Operators on Multiple Conditions

Here, we are using filter() function to filter the PySpark DataFrame with relational operators like >, < . == etc with multiple conditions.

Syntax:

dataframe.filter((conditin1) operator (condition2)……….)

Here, filter() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.
  4. condition specifies – (dataframe.column_name   operator   value).

Example:

In this example, we are going to filter the dataframe based on age column with or(|) , and (&) operator and display the filtered rows using the collect() method.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]


# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age greater than 10 and less than 21
print(df.filter((df.age > 10)&(df.age <21)).collect())
print("---------------------------")

#display dataframe
#by filtering age greater than 10 or less than 21
print(df.filter((df.age > 10)|(df.age <21)).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

Method – 4: filter() with SQL Expression

Here, we are using the filter() function to filter the PySpark DataFrame with SQL Expression. The condition is enclosed by “”. However, we can specify relational operator as a condition.

Syntax:

dataframe.filter(“dataframe.column_name   operator   value”)

Here, filter() accepts three parameters.

  1. dataframe.column_name is the column where filtering is applied.
  2. operator is the relational operator.
  3. value is the string/numeric value compared with column values.

Example:

In this example, we are going to filter the dataframe based on age, height, weight, and name columns with different relational operators and display the filtered rows using the collect() method.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering age as 23
#using sql expression
print(df.filter("age == 23").collect())
print("---------------------------")

#display dataframe
#by filtering height as 2.79
#using sql expression
print(df.filter("height == 2.79").collect())
print("---------------------------")

#display dataframe
#by filtering weight greater than 30
#using sql expression
print(df.filter("weight >30").collect())
print("---------------------------")

#display dataframe
#by filtering name as sravan
#using sql expression
print(df.filter("name == 'sravan'").collect())

Output:

[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]
---------------------------
[Row(address='patna', age=7, height=2.79, name='gnanesh chowdary', rollno='003', weight=17)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67), Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]
---------------------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Method – 4: filter() with String  Methods

Here, we are using the filter() function to filter the PySpark DataFrame with string methods. These are applied on the columns whose datatype is string. They are:

startswith() – check the starting character in the given data

Syntax:

dataframe.filter(dataframe.column_name.startswith("character")

Here column_name is the column where filtering is applied where the value starts with the given character.

endswith() – check the ending  character in the given data.

Syntax:

dataframe.filter(dataframe.column_name.endswith("character")

Here column_name is the column where filtering is applied where the value ends with the given character.

contains() – check the character contains in the given data.

Syntax:

dataframe.filter(dataframe.column_name.contains("character")

Here column_name is the column where filtering is applied where the value contains with the given character.

Example:

In this example, we are going to filter based on address column.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

#import the col function
from pyspark.sql.functions import col

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[
{'rollno':'001','name':'sravan','age':23,
 'height':5.79,'weight':67,'address':'guntur'},
{'rollno':'002','name':'ojaswi','age':16,
 'height':3.79,'weight':34,'address':'hyd'},
{'rollno':'003','name':'gnanesh chowdary','age':7,
 'height':2.79,'weight':17,'address':'patna'},
{'rollno':'004','name':'rohith','age':9,
 'height':3.69,'weight':28,'address':'hyd'},
{'rollno':'005','name':'sridevi','age':37,
 'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark_app.createDataFrame( students)

#display dataframe
#by filtering address

#starts with 'h'
print(df.filter(df.address.startswith("h")).collect())
print("-----------------")

#display dataframe
#by filtering address
#ends with 'r'
print(df.filter(df.address.endswith("r")).collect())
print("-----------------")

#display dataframe
#by filtering address
#comntains  'tu'
print(df.filter(df.address.contains("r")).collect())

Output:

[Row(address='hyd', age=16, height=3.79, name='ojaswi', rollno='002', weight=34), Row(address='hyd', age=9, height=3.69, name='rohith', rollno='004', weight=28), Row(address='hyd', age=37, height=5.59, name='sridevi', rollno='005', weight=54)]

-----------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]
-----------------
[Row(address='guntur', age=23, height=5.79, name='sravan', rollno='001', weight=67)]

Conclusion

In this article, we discussed how to use a filter() function with several methods on the PySpark dataframe in Python. And we included all the conditions with SQL expressions too.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain