Hyperparameter Tuning with Scikit-Learn and Dask
Quick Start
To jump right into a notebook, start up examples-cpu
in your Saturn account and run nyc-taxi/hyperparameter-dask.ipynb.
Set Up Environment
In Saturn, go to the “Projects” page and create a new project called tutorial
. Choose the following settings:
- image: saturncloud/saturn:2020.10.23
- disk space: 10 Gi
- size: Medium - 2 cores - 4 GB RAM
Start up that project’s Jupyter, and go into Jupyter Lab.
Set Up Dask cluster
Next, set up your Dask cluster. Saturn Cloud allows you to do this programmatically via dask-saturn
.
from dask_saturn import SaturnCluster
from dask.distributed import Client
cluster = SaturnCluster(
scheduler_size='medium',
worker_size='xlarge',
n_workers=3,
nthreads=4,
)
client = Client(cluster)
To see the options for scheduler and worker sizes, and how they match up to the options presented in Saturn Cloud, run the following:
from dask_saturn.core import describe_sizes
describe_sizes()
The Client
object is our “entry point” to Dask. Most Dask operations will automatically detect the client and run operations across the cluster, but sometimes its necessary to pass a client
object when performing more advanced operations. It also gives us a link for the Dask Diagnostic Dashboard where you can watch your tasks as they get scheduled and executed on your dask workers. We normally open that in a different tab, so we can keep an eye on it as we run our code. See the dask documentation for a full explanation of the dashboard.
client
The following cell will block until all workers are available. You can also view cluster status and access the Dashboard link from the Project page in Saturn Cloud.
client.wait_for_workers(3)
client.run(lambda: "Ready to go!")
You can change the number of workers on the running cluster using the cluster.scale()
method. Note that it will take a few minutes to spin up new workers, but you can use the above wait_for_workers()
function to block until they’re ready.
Scikit-learn + Joblib
Scikit-learn has some algorithms that support parallel execution via the n_jobs
parameter. By default, this parallelizes across all cores on a single machine (in this case, our Jupyter Server). Dask provides a Joblib backend that hooks into scikit-learn algorithms to parallelize work across a Dask cluster. This enables us to pull in Dask just for the grid search.
import s3fs
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import GridSearchCV
s3 = s3fs.S3FileSystem(anon=True)
taxi = pd.read_csv(
s3.open(
's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
mode='rb',
),
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
)
raw_features = [
'tpep_pickup_datetime',
'passenger_count',
'tip_amount',
'fare_amount',
]
features = [
'pickup_weekday',
'pickup_weekofyear',
'pickup_hour',
'pickup_week_hour',
'pickup_minute',
'passenger_count',
]
label = 'tip_fraction'
def prep_df(taxi_df):
'''
Generate features from a raw taxi dataframe.
'''
df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy() # avoid divide-by-zero
df[label] = df.tip_amount / df.fare_amount
df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
df = df[features + [label]].astype(float).fillna(-1)
return df
taxi_feat = prep_df(taxi)
pipeline = Pipeline(steps=[
('scale', StandardScaler()),
('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])
params = {
'clf__l1_ratio': np.arange(0, 1.1, 0.1),
'clf__alpha': [0, 0.5, 1, 2],
}
grid_search = GridSearchCV(
pipeline,
params,
cv=3,
n_jobs=-1,
verbose=1,
scoring='neg_mean_squared_error',
)
Take a sample of the dataset to make things run faster.
taxi_sample = taxi_feat.sample(frac=0.1, replace=False)
To execute the grid search in Dask we need to run inside a context manager for a Joblib backend. Besides that, we call the grid_search.fit()
method the same way as you would when using scikit-learn in a non-distributed environment. When you run this cell, watch the Dask Dashboard to see the progress.
%%time
import joblib
with joblib.parallel_backend('dask'):
_ = grid_search.fit(
taxi_sample[features],
taxi_sample[label],
)
Note: Using the Dask Joblib backend requires sending the DataFrame through the scheduler to all the workers. This causes problems with DataFrames larger than what was used in this example.
Dask + Joblib is useful for small data scenarios. Our next example will work with DataFrames of any size!
Dask ML
Dask ML has its own parallel implementations of some scikit-learn algorithms, including GridSearchCV
and other hyperparameter search options. To use it, we convert our pandas DataFrame to a Dask DataFrame and use Dask ML’s preprocessing and model selection classes. Don’t worry if you don’t know all the details of a Dask DataFrame - it’s very similar to a pandas DataFrame. For now, enjoy the speedups with Dask!
import dask.dataframe as dd
taxi_dd = dd.read_csv(
's3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
storage_options={'anon': True},
assume_missing=True,
)
taxi_feat_dd = prep_taxi(taxi_dd)
taxi_sample_dd = taxi_feat_dd.sample(frac=0.1, replace=False).persist()
Notice how the following code looks almost identical to the scikit-learn version, and even still uses scikit-learn’s Pipeline
and ElasticNet
classes.
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV
pipeline = Pipeline(steps=[
('scale', StandardScaler()),
('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])
params = {
'clf__l1_ratio': np.arange(0, 1.1, 0.1),
'clf__alpha': [0, 0.5, 1, 2],
}
grid_search = GridSearchCV(
pipeline,
params,
cv=3,
scoring='neg_mean_squared_error',
)
Now we can run the grid search using the grid_search
object defined above. (Hint: it works the same way as scikit-learn’s GridSearchCV
class).
%%time
_ = grid_search.fit(
taxi_sample_dd[features],
taxi_sample_dd[label],
)
Super fast!
This ran even faster than the Joblib example, because Dask was able to parallelize all steps of the pipeline. Dask ML’s GridSearchCV
class also avoids repeated work to make the grid search faster.