神刀安全网

Spark Pipelines: Elegant Yet Powerful

Erix Xu is a Data Scientist, Rails Developer at Outbrain and participated in the Spark Lab workshop in New York.

Introduction

We’ve all suffered through the experience of reopening a machine learning project and trying to trace back our thought process. Often times it feels like a jungle where dozens of feature engineering steps are criss-crossed with a grab-bag of hand-tuned models. If we can’t easily follow our own code, how can others?

That’s why I was excited when I learned about Spark’s Machine Learning (ML) Pipelines during the Insight Spark Lab. The Pipeline API, introduced in Spark 1.2, is a high-level API for MLlib. Inspired by the popular implementation in scikit-learn , the concept of Pipelines is to facilitate the creation, tuning, and inspection of practical ML workflows. In other words, it lets us focus more on solving a machine learning task, instead of wasting time spent on organizing code.

Why Use Pipelines?

Typically during the exploratory stages of a machine learning problem, we find ourselves iterating through dozens, if not hundreds, of features and model combinations. Our thinking process could resemble something like this:

Steps Components
We start out with three features as inputs to our model – Features A, B, C- Model X
Add a new feature – Features A, B, C, D new- Model X
Normalize an existing feature – Features A, B normalized, C, D- Model X
Use a different set of model parameters – Features A, B normalized, C, D- Model X parameters v2
Use the predictions from first model as features for a second model – Features A, B normalized
– Model X parameters v2
– Features C, D, predictions from Model X
– Model Y

Before long, our Jupyter notebook is filled with spaghetti code that takes up hundreds of cells. Trying to ensure that our training and test data go through the identical process is manageable, but also tends to be tedious and error prone.

A better solution is to wrap each combination of steps with a Pipeline . This gives us a declarative interface where it’s easy to see the entire data extraction, transformation, and model training workflow.

A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator . These stages are run in order, and the input DataFrame is transformed as it passes through each stage.

The code below demonstrates how multiple Transformers and Estimators can be bundled to create a complex workflow. At Insight Data Labs, I was provided with several data sources ranging from several hundred gigabytes to a terabyte. I decided on working with the Amazon reviews dataset. The raw data consists of restaurant reviews (String) and ratings (Integers), like so:

Review Rating
If you’re a vegetarian, there are just some places you shouldn’t go to. Sausage factory. Pig roast. Republican National Convention. And Applebee’s. 1
Unless you’re going for cocktails and appetizers, don’t try to actually eat anything at this Applebee’s. Try Chuck E. Cheese upstairs. They may have a cheese pizza option for vegetarians. They just might. 3
No utensils given unless we asked. Was I supposed to dunk my face into my appetizer like a dog? 1

During the feature engineering process, text features are extracted from the raw reviews using both the HashingTF and Word2Vec algorithms. The ratings data are binarized with a OneHotEncoder . The feature engineering results are then combined using the VectorAssembler , before being passed to a Logistic Regression model.

from pyspark.ml import Pipeline   from pyspark.ml.feature import *   from pyspark.ml.classification import LogisticRegression  # Configure pipeline stages tok = Tokenizer(inputCol="review", outputCol="words")   htf = HashingTF(inputCol="words", outputCol="tf", numFeatures=200)   w2v = Word2Vec(inputCol="review", outputCol="w2v")   ohe = OneHotEncoder(inputCol="rating", outputCol="rc")   va = VectorAssembler(inputCols=["tf", "w2v", "rc"], outputCol="features")   lr = LogisticRegression(maxIter=10, regParam=0.01)  # Build the pipeline pipeline = Pipeline(stages=[tok, htf, w2v, ohe, va, lr])  # Fit the pipeline model = pipeline.fit(train_df)  # Make a prediction prediction = model.transform(test_df)   

This DAG diagram visualizes the structure of the Pipeline and all of its stages. Blue pentagons represent Transformers , the yellow diamond represents an Estimator , and the boxes represent DataFrames with the current column names flowing through the Pipeline .

Spark Pipelines: Elegant Yet Powerful

Custom Transformers

The Spark community is quickly adding new feature transformers and algorithms for the Pipeline API with each version release. But what if we wanted to do something outside of the box like count the number of emojis in a block of text? It turns out to be not that difficult to extend the Transformer class and create our own custom transformers.

The basic rules to follow are that a Transformer needs to:

1. implement the transform method

2. specify an inputCol and outputCol

3. accept a DataFrame as input and return a DataFrame as output

The follow code snippet demonstrates a naive implementation of a word count Transformer .

from pyspark.ml.util import keyword_only   from pyspark.ml.pipeline import Transformer   from pyspark.ml.param.shared import HasInputCol, HasOutputCol  # Create a custom word count transformer class class MyWordCounter(Transformer, HasInputCol, HasOutputCol):       @keyword_only     def __init__(self, inputCol=None, outputCol=None):         super(WordCounter, self).__init__()         kwargs = self.__init__._input_kwargs         self.setParams(**kwargs)      @keyword_only     def setParams(self, inputCol=None, outputCol=None):         kwargs = self.setParams._input_kwargs         return self._set(**kwargs)      def _transform(self, dataset):         out_col = self.getOutputCol()         in_col = dataset[self.getInputCol()]          # Define transformer logic         def f(s):             return len(s.split(' '))         t = LongType()          return dataset.withColumn(out_col, udf(f, t)(in_col))  # Instantiate the new word count transformer wc = MyWordCounter(inputCol="review", outputCol="wc")   

We can now treat MyWordCounter like any other Transformer and add it as a stage to our Pipeline .

Bottom Line

Pipelines are a simple and effective way to manage complex machine learning workflows without tearing our hair out. It’s power stands out even more when we get to cross-validation for hyperparameter tuning. Overall, the Pipeline API is a major step in making machine learning scalable, easy, and enjoyable.

Already a data scientist or engineer?

Join us for a two-day advancedApache Spark Lab led by tech industry experts.

Interested in transitioning to career in data engineering?

Learn more about the Insight Data Engineering Fellows Program in New York and Silicon Valley.

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Spark Pipelines: Elegant Yet Powerful

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮