The aim of this blog is to explain how to use SparkML to build a Classification model. To explain the usage better, I am going to try solve the Kaggle knowledge challenge - Titanic: Machine Learning from Disaster. The source code of this project is available in my github.
Updated Code for Spark 2.0 can be found in this branch.
In this challenge, we are given a set of details of passengers such as name, gender, fare, cabin etc and if the person survived the Titanic disaster. Based on this we have to build a Model that can predict, given another passenger, if he/she is likely to survive. This is an example of binary classification where there are only two possible classes(1 if passenger survives and 0 if not).
- The first step when trying to build a machine learning model is to analyze and understand the data you have. So that you can decide which all features has to be used for building the model, whether the features are numeric or categorical, what is the mean,max or min of your numerical features and so on.
- Once the data is analyzed, next step is feature selection where we decide which all features are relevant for building the model
- Next is data preprocessing. The input data that you receive for modeling is not going to be good data most of the times. During this stage, for example, we can decide on what to do with the missing values - whether to drop rows having nulls, or fill those with average value of the feature(if feature is numerical), or fill with most occurring value of the feature(if feature is categorical) etc.
- Next comes the Feature engineering and Feature transformation step. In Feature engineering we derive new features from existing ones and during feature transformation we transform existing features so that it can be used for building the model.
- Finally we build the model using the selected features and do prediction on a new set of data.
We will be implementing all of the above steps using Spark and Scala and will be building a machine learning pipeline - the overall flow can be shown by the diagram below. The grey section of the diagram shows the model building flow and the blue section of the diagram shows the flow for making prediction.
Load and Analyze data
As mentioned earlier, first step is to analyze the data. To do that, we have to first load data into
Spark. Download the train.csv file from here, and open the file and check the content
As you can see, the file contains a header row which has PassengerId, Survived, Pclass, Name, Sex, Age, SibSp ,Parch ,Ticket ,Fare ,Cabin and Embarked. You can find more information about what each of these fields are from the Kaggle website. Move this file to some folder in HDFS(I have kept mine at
/kaggle/titanic/train.csv). The data is in csv format, to load csv files we will use the library spark-csv.
We will define a simple load function that can be used to load csv file. First start your spark-shell using the below command.
Note: You will have to import a few classes for this project, which can be found here
The method takes 3 inputs - the path where the csv file is, sqlContext and a featuresArr which is used to name the columns being loaded. We don’t really have to give the featuresArr here since our csv file contains header information. If not, the column names would have been assigned default values such as C0, C1 etc.
Use the load method defined, to load csv file and create a DataFrame
Note: We are caching the dataFrame in-memory by calling
cache(), this will help improve the performance during model building.
Now we will explore the loaded DataFrame for to understand the data better. We can check the schema of the loaded data by calling
As you can see, the spark-csv library has inferred the data type of each column. If you go back and check the load method you can see that we have used,
.option("inferSchema", "true") which tells the library to do so. If not set, all the fields will set to type
show() method in DataFrame can be used to display the dataframe in tabular form. You can also pass an int to this method to tell how many rows to be displayed. e.g.,
You can also see stats about any numerical column by using
Play around with other columns also till you get an idea on how the data is.
Fill missing values
On analyzing the data, you can see a few irregularities in it. For example there are few missing values in column Age. Similarly there are null/missing values in Cabin, Fare and Embarked. There are several techniques for filling in the missing values. You can
- Ignore/drop the rows having missing values. This can be done in spark by calling
- If the column is numerical, fill in the missing value with the mean/avg value of the column. We are going to replace the missing values in Age column by using this method.
- If the column is categorical, fill in with the most occurring category
- Build a machine learning model which can predict those missing values.
Discover new features
In many cases, there will be features in your input data that can be used to derive new features which will help in building a better model. This is also called
Feature Engineering. For example, if you take a closer look at the column ‘Name’, you can see that the format is
FirstName Title. LastName. We could not possibly make any prediction based on the passenger’s name but may be there is some relation between the Title and the passenger’s survival. So, we will extract the title from each name and form a new column/feature. The udf
findTitle is used for extracting title from a given string.
DataFrame provides a method
withColumn which can be used for adding/replacing an existing column. It takes two parameters - the name of the new column and a
Column of the current DataFrame. i.e., if you call
We will now apply the function
findTitle on the
Name column to extract title and create a new column - Title.
Similarly we will define 3 other udfs, using which we will generate new features.
ML pipeline will have a sequence of Pipeline components. There are two types of components - Transformers and Estimators. Transformers transforms the input Dataframe into a new DataFrame using the method
transform(). An Estimator first fits a model to data, using the method
fit() and then does transform. These will be more clear once you go through the below components.
To build a model in Spark, the features must be of the type Double but we have a few features which are of the type String. Spark provides a Feature Transformer - StringIndexer which can be used for this transformation.
Here StringIndexer is an Estimator that transforms the column Title, generates indices for the words and creates a new column named TitleIndex. Fit method of StringIndexer converts the column to StringType(if it is not of StringType) and then counts the occurrence of each word. It then sorts these words in descending order of their frequency and assigns an index to each word. StringIndexer.fit() method returns a StringIndexerModel which is a Transformer.
StringIndexerModel.transform() assigns the generated index to each value of the column in the given DataFrame.
Mr. is the most frequent word in this data, so it is given index 0. Similarly, we will also create an indexer for the feature - Sex
Note that we did not call methods fit() or transform() here, that will be taken care by the Pipeline. Pipeline will execute each stage and pass the result of current stage to the next. If a stage is a Transformer, Pipeline will call transform() on it, or if it is an Estimator, pipeline will first call fit() and then transform(). But if the Estimator is the last stage in a pipeline, then the transform() won't be called.
Binning / Bucketing
During Binning/Bukceting, a column with continuous values is converted into buckets. We define the start and end value of each bucket while creating the Bucketizer - which is a Transformer. We are going to bucketize the column ‘Fare’.
VectorAssembler is used for assembling features into a vector. We will pass all the columns that we are going to use for the prediction to the VectorAssembler and it will create a new vector column.
Next we will normalize or standardize the data using the transformer -
Normalizer. The normalizer will take the column created by the VectorAssembler, normalize it and produce a new column.
Building and Evaluating Model
We will be building our model using LogisticRegression algorithm which is used for classification. The variable that is being classified is called the dependent variable and other variables which decides the value of dependent variable are called independent variables.
In Logistic regression, based on the values of the independent variables, it predicts the probability that the dependent variable takes one of it’s categorical value(classes). In our example there are two possible classes 0 or 1. To create a LogitsticRegression component,
Using all the components we defined till now, create a pipeline object. As already mentioned, a pipeline has set of stages and each component we add is a stage in the pipeline. Pipeline will execute each stage one after another, first executing the fit(if Evaluator) and then passing the result of transform on to the next stage.
Training set & Test set
To evaluate the model, we will split our data into two - training set(80%) and test set(20%). We will build our model using the training set and evaluate it using test set. We will use area under ROC curve to determine how good the model is. To split input data,
We will now use the pipeline to fit our training data. The result of fitting pipeline on our training data is a PipelineModel object which can be used to do prediction on test data.
Note that the model object here is instance of PipelineModel not LogisticRegression. This is because LogisticRegression is only a component in our PipelineModel. Whenever a prediction is done for a data set, the data set has to go through all the transformations done by other components in the Pipeline before it can be used by the LogisticRegression component for prediction.
To evaluate how well the model did, select the columns ‘prediction’ and ‘Survived’ from
result, create an RDD of [(Double, Double)] and pass it on to BinaryClassificationMetrics.
Which is not bad, check this link to read more about how to evaluate the model based on the value of area under ROC curve.
The prediction that we did now, was on our input data where we knew the actual classification. The reason why split the data into train and test set is because we needed to compare actual result with predicted result for evaluating the model. Now will use the entire input data to train the model again.
Doing the Prediction
Download test.csv from Kaggle and put it in your HDFS. The test data(submission data) has to go through all loading and pre-process steps done on the training data with an additional requirement of adding the column ‘Survived’, because test.csv does not contain the column ‘Survived’. Loading and pre-processing of test data is done using the below code:
Use the PipelineModel object created during model building to do the prediction.
Let us now take a look at what our model predicted for first three passengers in test data
The model predicted that passengers with ID 892 and 894 will not survive and Passenger 893 will survive.
Note : Received a score of 0.77512 on submitting this to Kaggle.
This concludes the post and I hope it was helpful. Thanks for reading.