Erix Xu is a Data Scientist, Rails Developer at Outbrain and participated in the Spark Lab workshop in New York.
We’ve all suffered through the experience of reopening a machine learning project and trying to trace back our thought process. Often times it feels like a jungle where dozens of feature engineering steps are criss-crossed with a grab-bag of hand-tuned models. If we can’t easily follow our own code, how can others?
That’s why I was excited when I learned about Spark’s Machine Learning (ML) Pipelines during the Insight Spark Lab. The Pipeline API, introduced in Spark 1.2, is a high-level API for MLlib. Inspired by the popular implementation in scikit-learn , the concept of
Pipelines is to facilitate the creation, tuning, and inspection of practical ML workflows. In other words, it lets us focus more on solving a machine learning task, instead of wasting time spent on organizing code.
Why Use Pipelines?
Typically during the exploratory stages of a machine learning problem, we find ourselves iterating through dozens, if not hundreds, of features and model combinations. Our thinking process could resemble something like this:
|We start out with three features as inputs to our model||– Features A, B, C- Model X|
|Add a new feature||– Features A, B, C, D new- Model X|
|Normalize an existing feature||– Features A, B normalized, C, D- Model X|
|Use a different set of model parameters||– Features A, B normalized, C, D- Model X parameters v2|
|Use the predictions from first model as features for a second model|| – Features A, B normalized
– Model X parameters v2
– Features C, D, predictions from Model X
– Model Y
Before long, our Jupyter notebook is filled with spaghetti code that takes up hundreds of cells. Trying to ensure that our training and test data go through the identical process is manageable, but also tends to be tedious and error prone.
A better solution is to wrap each combination of steps with a
Pipeline . This gives us a declarative interface where it’s easy to see the entire data extraction, transformation, and model training workflow.
Pipeline is specified as a sequence of stages, and each stage is either a
Transformer or an
Estimator . These stages are run in order, and the input
DataFrame is transformed as it passes through each stage.
The code below demonstrates how multiple
Estimators can be bundled to create a complex workflow. At Insight Data Labs, I was provided with several data sources ranging from several hundred gigabytes to a terabyte. I decided on working with the Amazon reviews dataset. The raw data consists of restaurant reviews (String) and ratings (Integers), like so:
|If you’re a vegetarian, there are just some places you shouldn’t go to. Sausage factory. Pig roast. Republican National Convention. And Applebee’s.||1|
|Unless you’re going for cocktails and appetizers, don’t try to actually eat anything at this Applebee’s. Try Chuck E. Cheese upstairs. They may have a cheese pizza option for vegetarians. They just might.||3|
|No utensils given unless we asked. Was I supposed to dunk my face into my appetizer like a dog?||1|
During the feature engineering process, text features are extracted from the raw reviews using both the
Word2Vec algorithms. The ratings data are binarized with a
OneHotEncoder . The feature engineering results are then combined using the
VectorAssembler , before being passed to a Logistic Regression model.
from pyspark.ml import Pipeline from pyspark.ml.feature import * from pyspark.ml.classification import LogisticRegression # Configure pipeline stages tok = Tokenizer(inputCol="review", outputCol="words") htf = HashingTF(inputCol="words", outputCol="tf", numFeatures=200) w2v = Word2Vec(inputCol="review", outputCol="w2v") ohe = OneHotEncoder(inputCol="rating", outputCol="rc") va = VectorAssembler(inputCols=["tf", "w2v", "rc"], outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.01) # Build the pipeline pipeline = Pipeline(stages=[tok, htf, w2v, ohe, va, lr]) # Fit the pipeline model = pipeline.fit(train_df) # Make a prediction prediction = model.transform(test_df)
This DAG diagram visualizes the structure of the
Pipeline and all of its stages. Blue pentagons represent
Transformers , the yellow diamond represents an
Estimator , and the boxes represent
DataFrames with the current column names flowing through the
The Spark community is quickly adding new feature transformers and algorithms for the Pipeline API with each version release. But what if we wanted to do something outside of the box like count the number of emojis in a block of text? It turns out to be not that difficult to extend the
Transformer class and create our own custom transformers.
The basic rules to follow are that a
Transformer needs to:
1. implement the
2. specify an
3. accept a
DataFrame as input and return a
DataFrame as output
The follow code snippet demonstrates a naive implementation of a word count
from pyspark.ml.util import keyword_only from pyspark.ml.pipeline import Transformer from pyspark.ml.param.shared import HasInputCol, HasOutputCol # Create a custom word count transformer class class MyWordCounter(Transformer, HasInputCol, HasOutputCol): @keyword_only def __init__(self, inputCol=None, outputCol=None): super(WordCounter, self).__init__() kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None): kwargs = self.setParams._input_kwargs return self._set(**kwargs) def _transform(self, dataset): out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] # Define transformer logic def f(s): return len(s.split(' ')) t = LongType() return dataset.withColumn(out_col, udf(f, t)(in_col)) # Instantiate the new word count transformer wc = MyWordCounter(inputCol="review", outputCol="wc")
We can now treat
MyWordCounter like any other
Transformer and add it as a stage to our
Pipelines are a simple and effective way to manage complex machine learning workflows without tearing our hair out. It’s power stands out even more when we get to cross-validation for hyperparameter tuning. Overall, the Pipeline API is a major step in making machine learning scalable, easy, and enjoyable.
Already a data scientist or engineer?
Join us for a two-day advancedApache Spark Lab led by tech industry experts.
Interested in transitioning to career in data engineering?
Learn more about the Insight Data Engineering Fellows Program in New York and Silicon Valley.