Apache Spark

PySpark zip(), zipWithIndex() and zipWithUniqueId

RDD stands for Resilient Distributed Datasets. We can call RDD as a fundamental data structure in Apache Spark.

We need to import RDD from the pyspark.rdd module.

So In PySpark to create an RDD, we can use the parallelize() method.

Syntax:

spark_app.sparkContext.parallelize(data)

Where: data can be a one-dimensional (linear data) or two-dimensional data (row-column data).

In this PySpark article, we will discuss zip(), zipWithIndex() & zipWithUniqueId().

PySpark zip()

PySpark zip() function is used to combine values in both the RDD’s as pairs by returning a new RDD.

Syntax:

RDD data1.zip(RDD_data2)

Here:

  1. RDD_data1 is the first RDD
  2. RDD_data2 is the second RDD

Note that the total number of elements in the RDDs must be the same. Otherwise, it will return an error.

Example 1:

In this example, we will return zipped RDD from student_marks1 and student_marks2 numeric RDDs.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student marks data with 5 elements

student_marks1 =spark_app.sparkContext.parallelize([89,76,78,89,90])

# create student marks data with 5 elements

student_marks2 =spark_app.sparkContext.parallelize([1,2,3,4,5])

#display data in RDD

print("Actual data in student_marks1: ",student_marks1.map(lambda element: element).collect())

print("Actual data in student_marks2: ",student_marks2.map(lambda element: element).collect())

#zip the two RDD's using zip()

print(student_marks1.zip(student_marks2).collect())

Output:

Actual data in student_marks1: [89, 76, 78, 89, 90]

Actual data in student_marks2: ['1', 2, 3, 4, 5]

[(89, '1'), (76, 2), (78, 3), (89, 4), (90, 5)]

We can see that each value in the first RDD is combined with the second RDD.

Example 2:

In this example, we will return zipped RDD from student_marks1 and student_marks2 string RDDs.

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 2 elements

subjects1 =spark_app.sparkContext.parallelize(['python','java'])

# create student subjects data with 2 elements

subjects2 =spark_app.sparkContext.parallelize(['html','java'])

#display data in RDD

print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

print("Actual data in subjects2: ",subjects2.map(lambda element: element).collect())

#zip the two RDD's using zip()

print(subjects1.zip(subjects2).collect())

<strong>Output:</strong>

Actual data in subjects1: ['python', 'java']

Actual data in subjects2: ['html', 'java']

[('python', 'html'), ('java', 'java')]

We can see that values from both RDD’s are zipped.

PySpark zipWithIndex()

PySpark zipWithIndex() function is used to combine values in a single RDD with values. Here, values by default start with 0.

Syntax:

RDD_data.zipWithIndex()

Here, RDD_data is the RDD

Example 1:

In this example, we created one RDD with 2 string elements and zip with values using zipWithIndex().

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 2 elements

subjects1 =spark_app.sparkContext.parallelize(['python','java'])

#display data in RDD

print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithIndex()

print(subjects1.zipWithIndex().collect())

Output:

Actual data in subjects1: ['python', 'java']

[('python', 0), ('java', 1)]

We can see that the value python is ripped with value 0 and java is zipped with value 1.

Example 2:

In this example, we created one RDD with 6 string elements and zip with values using zipWithIndex().

#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 6 elements

subjects1 =spark_app.sparkContext.parallelize(['python','java','python','java','python','java'])

#display data in RDD

print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithIndex()

print(subjects1.zipWithIndex().collect())

Output:

Actual data in subjects1: ['python', 'java', 'python', 'java', 'python', 'java']

[('python', 0), ('java', 1), ('python', 2), ('java', 3), ('python', 4), ('java', 5)]

PySpark zipWithUniqueId()

PySpark zipWithUniqueId() function is similar to above method, but the values that forms a pair is in the following pattern:

k,1*n+k,2*n+k,3*n+k….

n represents the number of partitions.

Syntax:

RDD_data.zipWithUniqueId()

Here, RDD_data is the RDD

There can be many gaps in between the values zipped.

Example:

.#import the pyspark module

import pyspark

#import SparkSession for creating a session

from pyspark.sql import SparkSession

# import RDD from pyspark.rdd

from pyspark.rdd import RDD

#create an app named linuxhint

spark_app = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student subjects data with 6 elements

subjects1 =spark_app.sparkContext.parallelize(['python','java','python','java','python','java'])

#display data in RDD

print("Actual data in subjects1: ",subjects1.map(lambda element: element).collect())

#zip the two RDD's using zipWithUniqueId()

print(subjects1.zipWithUniqueId().collect())

Output:

Actual data in subjects1: ['python', 'java', 'python', 'java', 'python', 'java']

[('python', 0), ('java', 2), ('python', 4), ('java', 1), ('python', 3), ('java', 5)]

From the above output, we can see that different values are zipped with actual values.

Conclusion

In this tutorial, we saw how to zip the RDD with some values. zip() is used to zip two RDD’s pairs. zipWithIndex() is used to zip with values & zipWithUniqueId() is used to zip with values based on partitions.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain