Apache Spark

PySpark XGBoost

In this PySpark guide, we will discuss how to use the XGBoost machine learning model to predict the price status of a house. First, we will see the approach (steps section) and discuss how to predict the status of the house whether it is “High”, “Low”, “Extreme”, and “Medium”. You can look at the syntax in each step before looking into the example.

XGBOOST

XGBoost stands for “Extreme Gradient Boosting” which is used in predictive analytics for classification/regression problems. This is a Machine Learning model which works on both classification as well as regression scenarios. This model works efficiently on high volume datasets; parallel processing is also possible in this model. Basically, it creates a number of decision trees internally and processes each decision. It finally combines all the tree outcomes and returns the final result. It mainly helps to boost the decision tree internally. We will see the syntax of this module in this guide with the help of the  “House Price Status Prediction” project.

Steps:

Data Collection: Data is necessary to perform any predictions/analysis. It can be in the form of CSV/XLSX, etc. We can load this into the Spark environment (DataFrame) using the spark.read.csv() method.

Data Analysis: Analyzing the attributes/columns is known as “Data Analysis”. The columns which help to predict the class are known as independent attributes. The column which results in prediction is known as dependent or target attribute. In this scenario, we can use the columns property to display all the columns. The distinct() method is used to see the unique values.

Data Preprocessing: Filtering the null/missing values is known as preprocessing. We remove all the missing values in this stage. The machine knows only the binary language. So, all the string categories should be converted to numeric categorical values. In PySpark, we can use “StringIndexer” which is the available class in the pyspark.ml.feature module to convert the string categories to numeric. It automatically converts them internally. We need not to provide the values. You can use the following syntax:

indexer_data=StringIndexer(inputCol="String_Category_ColumnName",outputCol=
'New_Column_name')

 
Vector Assembling: Now, you have the data that the machine can understand. In this stage, all the independent attributes should be vectorized into a single column. This can be done using the VectorAssembler class. It takes two parameters. The first parameter is the inputCols which takes a list of independent attributes. The second parameter is the outputCol which vectorizes all the inputCols into this variable.

assembler=VectorAssembler(inputCols=[columns…],outputCol=vectorized_data)

 
Transformation: Now, prepare your data by transforming the updated columns (step 3) using the transform() function.

assembler.transform(inxed_data)

 
Preparing Data for Training & Testing: In this stage, we split the data into training and testing. It is better if we split 70% of data to train the model and 30% of data to test the model. This can be achieved using the randomSplit() method. It takes a list which holds two float values: one for test split and another for train split.

train_data,test_data=final_data.select(['features',target_column]).randomSplit
([0.70.0.30])

 
Model Fitting: It’s time to fit the XGBoost model.  This model is available in the xgboost.pyspark module. It takes the class label or target column. In this model, we can define a classifier/regressor class.

SparkXGBClassifier(Features_col, label_col,use_gpu)
SparkXGBRegressor(Features_col, label_col=label_name,use_gpu)

 
It takes two mandatory parameters:

    1. The “Features_col” takes a list of independent attributes which is used to predict the target variable.
    2. The “Label_col” takes the target label.
    3. If you want to run this model on GPU (Graphical Processing Unit), we can specify the use_gpu parameter and set it to “True”. Otherwise, specify it to “False”.

House Price Status Prediction

We are going to predict the price status of a house based on type, duration, stome, and Earthquake_region attributes. The outcome can be “High”/”Medium”/”Low”/”Extreme”.

Install the PySpark module in your environment. The following is the command to install:

pip install pyspark

 

Consider the “houses.csv” CSV file with 10 records and load it into the PySpark DataFrame. The id column is not at all considered for the prediction.

import pyspark
from pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName('Linux Hint').getOrCreate()

# Read houses.csv into the PySpark DataFrame
house_data =linuxhint_spark_app.read.csv('houses.csv',header=True)
                                         
# Actual house_data
house_data.show()

 
Output:


Display the independent columns using the select() method.

# Display independent attributes
house_data.select(house_data["Type"],house_data["Duration"],house_data["Stone"],
house_data["Earthquack_region"]).show()

 
Output:


Display the target attribute (Price_Status).

# target attribute
house_data.select(house_data["Price_Status"]).show()

 
Output:


Return the data type of all the columns using the printSchema() function.

# Return datatype of all the columns.
house_data.printSchema()

 
Output:


We can see that 5 columns are of string type. Let’s convert them into categorical numeric values.

Let’s convert the string categorical values to numeric categorical values using StringIndexer in the “Type”, “Duration”, “Stone”, ”Price_Status” and “Earthquack_region” columns and write these into Categotical_Type, Categotical_Duration, Categotical_Stone, Status, and Categotical_Earthquack_region columns. Store the columns into the indexed_data DataFrame using the fit() method.

from pyspark.ml.feature import StringIndexer

# Convert string categorical values to numeric categorical values in the "Type" column.
indexer_data=StringIndexer(inputCol= "Type",outputCol='Categotical_Type')
indexed_data=indexer_data.fit(house_data).transform(house_data)

# Convert string categorical values to numeric categorical values in the "Duration" column.
indexer_data=StringIndexer(inputCol= "Duration",outputCol='Categotical_Duration')
indexed_data=indexer_data.fit(indexed_data).transform(indexed_data)

# Convert string categorical values to numeric categorical values in the "Stone" column.
indexer_data=StringIndexer(inputCol= "Stone",outputCol='Categotical_Stone')
indexed_data=indexer_data.fit(indexed_data).transform(indexed_data)

# Convert string categorical values to numeric categorical values in the "Earthquack_region" column.
indexer_data=StringIndexer(inputCol= "Earthquack_region",outputCol='Categotical_Earthquack_region')
indexed_data=indexer_data.fit(indexed_data).transform(indexed_data)

# Convert string categorical values to numeric categorical values in the "Price_Status" column.
indexer_data=StringIndexer(inputCol= "Price_Status",outputCol='Status')
indexed_data=indexer_data.fit(indexed_data).transform(indexed_data)

# Display the updated DataFrame.
indexed_data.show()

 
Output:

  1. 1. Convert the independent columns into a vector using the vector assembler. The vector name is “features”.
from pyspark.ml.feature import VectorAssembler

assembler= VectorAssembler(inputCols=['Categotical_Type','Categotical_Duration','Categotical_Stone','Categotical_Earthquack
_region'
],outputCol='features')

 
Transform the previous data into a final DataFrame using the transform() function and display it using the show() function.

# Transform the assembler
final= assembler.transform(indexed_data)

# Now display the final/vectorized independent features and dependant label - 'Status'
final.select('features','Status').show(truncate=False)

 
Output:


Prepare the data for training and testing by splitting it into 70-30. The “features” vector is stored in train_data and the “Status” vector is stored in test_data.

# Prepare data for training and testing
train_data,test_data=final.select(['features','Status']).randomSplit([0.70,0.30])
train_data.show()
test_data.show()

 

  1. 2. Fit the XGBoost classifier model.
from xgboost.spark import SparkXGBClassifier

# Define the SparkXGBClassifier class
SparkXGBClassifier_def = SparkXGBClassifier(
  features_col='features',
  label_col='Status',
  use_gpu=False
)

# Fit the model.
classifier_model = SparkXGBClassifier_def.fit(train_data)

 
Transform the model using the transform() function into the test_data and display it.

# Transform the model using the transform() method.
Predict_Data = classifier_model.transform(test_data)
Predict_Data.show()

 
Output:


We can see the results in the “prediction” column.

Conclusion

XGBoost in machine learning works on both classification as well as regression scenarios. This model works efficiently on high volume datasets; parallel processing is also possible in this model. In this guide, we have seen how to use the XGBoost classification technique to predict the house price status. Code snippets along with outputs are provided in each step. We utilized the “StringIndexer” class to convert the categorical strings to categorical numeric values.

About the author

Gottumukkala Sravan Kumar

B tech-hon's in Information Technology; Known programming languages - Python, R , PHP MySQL; Published 500+ articles on computer science domain