Hyperparameter Tuning with Scikit-Learn and Dask

Hyperparameter searching is an example of a compute-bound workload. The data fits comfortably into memory of the Jupyter Server, but the grid search still takes some time to execute. Let’s take this workflow and parallelize it with Dask!

Quick Start

To jump right into a notebook, start up examples-cpu in your Saturn account and run nyc-taxi/hyperparameter-dask.ipynb.

Set Up Environment

In Saturn, go to the “Projects” page and create a new project called tutorial. Choose the following settings:

  • image: saturncloud/saturn:2020.10.23
  • disk space: 10 Gi
  • size: Medium - 2 cores - 4 GB RAM

Start up that project’s Jupyter, and go into Jupyter Lab.

Set Up Dask cluster

Next, set up your Dask cluster. Saturn Cloud allows you to do this programmatically via dask-saturn.

from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)

To see the options for scheduler and worker sizes, and how they match up to the options presented in Saturn Cloud, run the following:

from dask_saturn.core import describe_sizes
describe_sizes()

The Client object is our “entry point” to Dask. Most Dask operations will automatically detect the client and run operations across the cluster, but sometimes its necessary to pass a client object when performing more advanced operations. It also gives us a link for the Dask Diagnostic Dashboard where you can watch your tasks as they get scheduled and executed on your dask workers. We normally open that in a different tab, so we can keep an eye on it as we run our code. See the dask documentation for a full explanation of the dashboard.

client

The following cell will block until all workers are available. You can also view cluster status and access the Dashboard link from the Project page in Saturn Cloud.

client.wait_for_workers(3)
client.run(lambda: "Ready to go!")

You can change the number of workers on the running cluster using the cluster.scale() method. Note that it will take a few minutes to spin up new workers, but you can use the above wait_for_workers() function to block until they’re ready.

Scikit-learn + Joblib

Scikit-learn has some algorithms that support parallel execution via the n_jobs parameter. By default, this parallelizes across all cores on a single machine (in this case, our Jupyter Server). Dask provides a Joblib backend that hooks into scikit-learn algorithms to parallelize work across a Dask cluster. This enables us to pull in Dask just for the grid search.

import s3fs
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import GridSearchCV

s3 = s3fs.S3FileSystem(anon=True)

taxi = pd.read_csv(
    s3.open(
        's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
        mode='rb',
    ),
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
)

raw_features = [
    'tpep_pickup_datetime',
    'passenger_count',
    'tip_amount',
    'fare_amount',
]
features = [
    'pickup_weekday',
    'pickup_weekofyear',
    'pickup_hour',
    'pickup_week_hour',
    'pickup_minute',
    'passenger_count',
]
label = 'tip_fraction'

def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df

taxi_feat = prep_df(taxi)

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

params = {
    'clf__l1_ratio': np.arange(0, 1.1, 0.1),
    'clf__alpha': [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    n_jobs=-1,
    verbose=1,
    scoring='neg_mean_squared_error',
)

Take a sample of the dataset to make things run faster.

taxi_sample = taxi_feat.sample(frac=0.1, replace=False)

To execute the grid search in Dask we need to run inside a context manager for a Joblib backend. Besides that, we call the grid_search.fit() method the same way as you would when using scikit-learn in a non-distributed environment. When you run this cell, watch the Dask Dashboard to see the progress.

%%time
import joblib

with joblib.parallel_backend('dask'):
    _ = grid_search.fit(
        taxi_sample[features],
        taxi_sample[label],
    )

Note: Using the Dask Joblib backend requires sending the DataFrame through the scheduler to all the workers. This causes problems with DataFrames larger than what was used in this example.

Dask + Joblib is useful for small data scenarios. Our next example will work with DataFrames of any size!

Dask ML

Dask ML has its own parallel implementations of some scikit-learn algorithms, including GridSearchCV and other hyperparameter search options. To use it, we convert our pandas DataFrame to a Dask DataFrame and use Dask ML’s preprocessing and model selection classes. Don’t worry if you don’t know all the details of a Dask DataFrame - it’s very similar to a pandas DataFrame. For now, enjoy the speedups with Dask!

import dask.dataframe as dd

taxi_dd = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)

taxi_feat_dd = prep_taxi(taxi_dd)
taxi_sample_dd = taxi_feat_dd.sample(frac=0.1, replace=False).persist()

Notice how the following code looks almost identical to the scikit-learn version, and even still uses scikit-learn’s Pipeline and ElasticNet classes.

import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet

from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

params = {
    'clf__l1_ratio': np.arange(0, 1.1, 0.1),
    'clf__alpha': [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    scoring='neg_mean_squared_error',
)

Now we can run the grid search using the grid_search object defined above. (Hint: it works the same way as scikit-learn’s GridSearchCV class).

%%time
_ = grid_search.fit(
    taxi_sample_dd[features],
    taxi_sample_dd[label],
)

Super fast!

This ran even faster than the Joblib example, because Dask was able to parallelize all steps of the pipeline. Dask ML’s GridSearchCV class also avoids repeated work to make the grid search faster.