Apache Spark

PySpark zip(), zipWithIndex() and zipWithUniqueId()

In this article we will introduce and demonstrate PySpark’s zip(), zipWithIndex() and zipWithUniqueId() methods.

Before we get started with these methods, we need to import RDD from the pyspark.rdd module. RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark. So In PySpark to create an RDD, we can use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where: data can be a one-dimensional (linear data) or two-dimensional data (row-column data).

In this PySpark article, we will discuss zip(), zipWithIndex() & zipWithUniqueId().

PySpark zip()

PySpark zip() function is used to combine values in both the RDD’s as pairs by returning a new RDD.

Syntax:

RDD data1.zip(RDD_data2)

Here:

  1. RDD_data1 is the first RDD
  2. RDD_data2 is the second RDD

Note that the total number of elements in the RDDs must be the same. Otherwise, it will return an error.

Example 1:

In this example, we will return zipped RDD from student_marks1 and student_marks2 numeric RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 5 elements
student_marks1 =spark_app.sparkContext.parallelize([89,76,78,89,90])

# create student marks data with 5 elements
student_marks2 =spark_app.sparkContext.parallelize([1,2,3,4,5])

#display data in RDD
print("Actual data in student_marks1: ",student_marks1.map(lambda element: element).collect())
print("Actual data in student_marks2: ",student_marks2.map(lambda element: element).collect())

#zip the two RDD's using zip()
print(student_marks1.zip(student_marks2).collect())

Output:

Actual data in student_marks1: [89, 76, 78, 89, 90]
Actual data in student_marks2: ['1', 2, 3, 4, 5]
[(89, '1'), (76, 2), (78, 3), (89, 4), (90, 5)]

We can see that each value in the first RDD is combined with the second RDD.

Example 2:

In this example, we will return zipped RDD from student_marks1 and student_marks2 string RDDs.

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 2 elements
subjects1 =spark_app.sparkContext.parallelize(['python','java'])

# create student subjects data with 2 elements
subjects2 =spark_app.sparkContext.parallelize(['html','java'])

#display data in RDD
print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())
print("Actual data in subjects2: ",subjects2.map(lambda element: element).collect())

#zip the two RDD's using zip()
print(subjects1.zip(subjects2).collect())

<strong>Output:</strong>

Actual data in subjects1: ['python', 'java']
Actual data in subjects2: ['html', 'java']
[('python', 'html'), ('java', 'java')]

We can see that values from both RDD’s are zipped.

PySpark zipWithIndex()

PySpark zipWithIndex() function is used to combine values in a single RDD with values. Here, values by default start with 0.

Syntax:

RDD_data.zipWithIndex()

Here, RDD_data is the RDD

Example 1:

In this example, we created one RDD with 2 string elements and zip with values using zipWithIndex().

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 2 elements
subjects1 =spark_app.sparkContext.parallelize(['python','java'])

#display data in RDD
print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithIndex()
print(subjects1.zipWithIndex().collect())

Output:

Actual data in subjects1: ['python', 'java']
[('python', 0), ('java', 1)]

We can see that the value python is ripped with value 0 and java is zipped with value 1.

Example 2:

In this example, we created one RDD with 6 string elements and zip with values using zipWithIndex().

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 6 elements
subjects1 =spark_app.sparkContext.parallelize(['python','java','python','java','python','java'])

#display data in RDD
print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithIndex()
print(subjects1.zipWithIndex().collect())

Output:

Actual data in subjects1: ['python', 'java', 'python', 'java', 'python', 'java']
[('python', 0), ('java', 1), ('python', 2), ('java', 3), ('python', 4), ('java', 5)]

PySpark zipWithUniqueId()

PySpark zipWithUniqueId() function is similar to above method, but the values that forms a pair is in the following pattern:

k,1*n+k,2*n+k,3*n+k….

n represents the number of partitions.

Syntax:

RDD_data.zipWithUniqueId()

Here, RDD_data is the RDD

There can be many gaps in between the values zipped.

Example:

#import the pyspark module
import pyspark

#import SparkSession for creating a session
from pyspark.sql import SparkSession

# import RDD from pyspark.rdd
from pyspark.rdd import RDD

#create an app named linuxhint
spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 6 elements
subjects1 =spark_app.sparkContext.parallelize(['python','java','python','java','python','java'])

#display data in RDD
print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithUniqueId()
print(subjects1.zipWithUniqueId().collect())

Output:

Actual data in subjects1: ['python', 'java', 'python', 'java', 'python', 'java']
[('python', 0), ('java', 2), ('python', 4), ('java', 1), ('python', 3), ('java', 5)]

From the above output, we can see that different values are zipped with actual values.

Conclusion

In this tutorial, we saw how to zip the RDD with some values. zip() is used to zip two RDD’s pairs. zipWithIndex() is used to zip with values & zipWithUniqueId() is used to zip with values based on partitions.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain