Grid Search with Scikit-learn and Dask

This example demonstrates how scikit-learn grid search can be accelerated with Dask parallelization. Run it yourself, or use as a starting point for your own code.

Setup

Hyperparameter tuning is a crucial, and often painful, part of building machine learning models. Squeezing out each bit of performance from your model may mean the difference of millions of dollars in ad revenue or life-and-death for patients in healthcare models. Even if your model takes one minute to train, you can end up waiting hours for a grid search to complete (think a 10×10 grid, cross-validation, etc.). Each time you wait for a search to finish breaks an iteration cycle and increases the time it takes to produce value with your model.

You can improve the speed of your hyperparameter search by replacing a few lines of your scikit-learn pipeline with Dask code. This turns a potentially overnight parameter search to a matter of waiting a few seconds.

Set up cluster and ensure it is running

The Client object is our “entry point” to Dask. Most Dask operations will automatically detect the client and run operations across the cluster, but sometimes its necessary to pass a client object when performing more advanced operations.

from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)

client.wait_for_workers(3)

Joblib Version

This version does not use Dask Dataframes but does do parallelization in the grid search. Scikit-learn has some algorithms that support parallel execution via the n_jobs parameter. By default, this parallelizes across all cores on a single machine (in this case, our Jupyter Server). Dask provides a Joblib backend that hooks into scikit-learn algorithms to parallelize work across a Dask cluster. This enables us to pull in Dask just for the grid search.

import s3fs
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import GridSearchCV

Create pandas dataframe for training data

s3 = s3fs.S3FileSystem(anon=True)

taxi = pd.read_csv(
    s3.open(
        's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
        mode='rb',
    ),
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
)

Specify feature and label column names

raw_features = [
    'tpep_pickup_datetime',
    'passenger_count',
    'tip_amount',
    'fare_amount',
]
features = [
    'pickup_weekday',
    'pickup_weekofyear',
    'pickup_hour',
    'pickup_week_hour',
    'pickup_minute',
    'passenger_count',
]
label = 'tip_fraction'

Clean data

def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df

taxi_feat = prep_df(taxi)

Set up training grid

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

params = {
    'clf__l1_ratio': np.arange(0, 1.1, 0.1),
    'clf__alpha': [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    n_jobs=-1,
    verbose=1,
    scoring='neg_mean_squared_error',
)

Run training

To execute the grid search in Dask we need to run inside a context manager for a Joblib backend. Besides that, we call the grid_search.fit() method the same way as you would when using scikit-learn in a non-distributed environment. When you run this cell, watch the Dask Dashboard to see the progress.

taxi_sample = taxi_feat.sample(frac=0.1, replace=False)
import joblib

with joblib.parallel_backend('dask'):
    _ = grid_search.fit(
        taxi_sample[features],
        taxi_sample[label],
    )

Note: Using the Dask Joblib backend requires sending the DataFrame through the scheduler to all the workers. This causes problems with DataFrames larger than what was used in this example.

Dask-ML Version

This version simplifies and accelerates the grid search by using Dask Dataframes. Assumes cluster is set up as shown at the top of this document. Dask ML has its own parallel implementations of some scikit-learn algorithms, including GridSearchCV and other hyperparameter search options. To use it, we convert our pandas DataFrame to a Dask DataFrame and use Dask ML’s preprocessing and model selection classes.

Create dask dataframe for training data

import dask.dataframe as dd

taxi_dd = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)

Clean data

def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df

taxi_feat_dd = prep_df(taxi_dd)

taxi_sample_dd = taxi_feat_dd.sample(frac=0.1, replace=False).persist()

import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV

Set up training grid

Notice how the following code looks almost identical to the scikit-learn version, and even still uses scikit-learn’s Pipeline and ElasticNet classes.

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

params = {
    'clf__l1_ratio': np.arange(0, 1.1, 0.1),
    'clf__alpha': [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    scoring='neg_mean_squared_error',
)

Run training

Now we can run the grid search using the grid_search object defined above. (Hint: it works the same way as scikit-learn’s GridSearchCV class).

_ = grid_search.fit(
    taxi_sample_dd[features],
    taxi_sample_dd[label],
)



Need help, or have more questions? Contact us at: We'll be happy to help you and answer your questions!