Apache Spark

PySpark takeOrdered() and takeSample()

RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark.

We need to import RDD from the pyspark.rdd module.

In PySpark to create an RDD, we can use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where data can be a one-dimensional (linear data) or two-dimensional data (row-column data).

In this PySpark article, we will discuss about takeOrdered() and takeSample() functions.

PySpark takeOrdered()

PySpark takeOrdered() function is used to return a new RDD in an order from existing RDD.

It takes two parameters.

Syntax:

takeOrdered(n,key)

Parameters:

  1. n is used to return the number of elements in an order in the new RDD.
  2. key is an optional parameter which takes an anonymous function to get the elements in ascending order or descending order.

For ascending order, the function can be – key=lambda element: element

For descending order, the function can be – key=lambda element: -element

It takes minus (-) sign to take elements in descending order.

If it is not specified, the elements in the new RDD will be in ascending order.

Example 1:

In this example, we are creating an RDD with 20 elements and applying the takeOrdered() function to get the first 10 elements. First 12 elements separately in ascending order using key parameter.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements
student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD
print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#Get 10 elements from RDD in ascending order
print("10 elements from RDD in an ascending order with key as parameter: ",student_marks.takeOrdered(10,key=lambda element: element))

#Get 12 elements from RDD in an ascending order
print("12 elements from RDD in an ascending order with key as parameter: ",student_marks.takeOrdered(12,key=lambda element: element))

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
10 elements from RDD in an ascending order with key as parameter: [21, 22, 23, 34, 34, 34, 34, 43, 45, 54]
12 elements from RDD in an ascending order with key as parameter: [21, 22, 23, 34, 34, 34, 34, 43, 45, 54, 56, 56]

You can see that new RDDs are returned with elements in ascending order.

Example 2:

In this example, we are creating an RDD with 20 elements and applying the takeOrdered() function to get the first 10 elements. The first 12 elements are separated in ascending order without key parameter.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements
student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD
print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#Get 10 elements from RDD in ascending order
print("10 elements from RDD in an ascending order: ",student_marks.takeOrdered(10))

#Get 12 elements from RDD in an ascending order
print("12 elements from RDD in an ascending order: ",student_marks.takeOrdered(12))

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
10 elements from RDD in an ascending order: [21, 22, 23, 34, 34, 34, 34, 43, 45, 54]
12 elements from RDD in an ascending order: [21, 22, 23, 34, 34, 34, 34, 43, 45, 54, 56, 56]

You can see that new RDDs are returned with elements in ascending order still without key parameter.

Example 3:

In this example, we are creating an RDD with 20 elements and applying the takeOrdered() function to get the first 10 element. The first 12 elements are separated in descending order.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements
student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD
print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#Get 10 elements from RDD in descending order
print("10 elements from RDD in an descending order: ",student_marks.takeOrdered(10,key=lambda element: -element))

#Get 12 elements from RDD in descending order
print("12 elements from RDD in an descending order: ",student_marks.takeOrdered(12,key=lambda element: -element))

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
10 elements from RDD in an descending order: [100, 90, 89, 89, 78, 78, 76, 56, 56, 56]
12 elements from RDD in an descending order: [100, 90, 89, 89, 78, 78, 76, 56, 56, 56, 54, 45]

You can see that new RDDs are returned with elements in descending order. For this case, you must specify the key parameter.

PySpark takeSample()

PySpark takeSample() function is used to return a new RDD with random values from an existing RDD. It takes two parameters.

Syntax:

takeSample(replace,n)

Parameters:

  1. replace takes boolean values. If it is set to True, then the random values that are returning into new RDD are unique (they are not repeated/replaced again). If it is set to False, then the random values that are returning into new RDD can be repeated (they can be repeated/replaced again)
  2. n is used to return the number of random elements into a new RDD.

Example 1:

In this example, we will create an RDD with 20 elements and return 10 elements. 12 elements are separated without replacement.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements
student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD
print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#get 10 elements from the RDD without replacement
print("10 elements from the RDD without replacement: ",student_marks.takeSample(False,10))

#get 12 elements from the RDD without replacement
print("12 elements from the RDD without replacement: ",student_marks.takeSample(False,12))

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
10 elements from the RDD without replacement: [89, 23, 78, 56, 43, 100, 89, 78, 76, 21]
12 elements from the RDD without replacement: [89, 23, 43, 54, 34, 45, 78, 56, 22, 56, 34, 34]

We can observe that random values are not replaced in the new RDD.

Example 2:

In this example, we will create an RDD with 20 elements and return 10 elements. 12 elements are separated with replacement.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 20 elements
student_marks =spark_app.sparkContext.parallelize([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])

#display data in RDD
print("Actual data in RDD: ",student_marks.map(lambda element: element).collect())

#get 10 elements from the RDD with replacement
print("10 elements from the RDD with replacement: ",student_marks.takeSample(True,10))

#get 12 elements from the RDD with replacement
print("12 elements from the RDD with replacement: ",student_marks.takeSample(True,12))

Output:

Actual data in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
10 elements from the RDD with replacement: [90, 34, 78, 34, 34, 89, 23, 22, 76, 21]
12 elements from the RDD with replacement: [43, 78, 78, 78, 78, 89, 21, 100, 89, 78, 34, 22]

We can observe that random values are replaced in both the new RDDs since we set the parameter to True.

Conclusion

In this PySpark tutorial, we saw how to use takeOrdered() & takeSample() functions on RDD. Both are used to return the new RDD from the existing RDD.

takeOrdered() returns the new RDD from an existing RDD with elements in a sorting order. It is possible to get the sorted data in descending order by using the key parameter through the lambda function.

takeSample() returns the new RDD from an existing RDD with some elements randomly. It is possible to repeat the randomly generated values in the new RDD again using the replace parameter.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain