Training with Large Datasets

Use Dask ML to train a linear model to predict tip_fraction and save out the model for later use.

Set Up Environment

In Saturn, go to the “Projects” page and create a new project called tutorial. Choose the following settings:

  • image: saturncloud/saturn:2020.10.23
  • disk space: 10 Gi
  • size: Medium - 2 cores - 4 GB RAM

Start up that project’s Jupyter, and go into Jupyter Lab.

Set Up Dask cluster

Next, set up your Dask cluster. Saturn Cloud allows you to do this programmatically via dask-saturn.

from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)
client.wait_for_workers(3)

client

Load data

We can load the data straight from s3 using dd.read_csv.

import dask.dataframe as dd

taxi = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)
taxi

Feature Engineering

Notice that this feature engineering code is exactly the same as what we do in pandas. Dask’ DataFrame API matches pandas’ API in many places. Check out the Dask DataFrame docs for more information on what is and is not supported from the pandas API.

# specify feature and label column names
raw_features = [
    'tpep_pickup_datetime',
    'passenger_count',
    'tip_amount',
    'fare_amount',
]
features = [
    'pickup_weekday',
    'pickup_weekofyear',
    'pickup_hour',
    'pickup_week_hour',
    'pickup_minute',
    'passenger_count',
]
label = 'tip_fraction'

def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df

taxi_feat = prep_df(taxi)
taxi_feat.head()

Train model with large dataset

We’ll train a linear model to predict tip_fraction. We define a Pipeline to encompass both feature scaling and model training. This will be useful later when performing a grid search.

Evaluate the model against the test set using RMSE. We’ll also save out the model for later use. First, we need to split our taxi_feat DataFrame into train/test sets. Use the dask_ml.model_selection.train_test_split function to split into train and test sets.

Notice that this function works the same as sklearn.model_selection.train_test_split!

from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    taxi_feat[features],
    taxi_feat[label],
    test_size=0.3,
    random_state=42
)

Due to Dask’s lazy evaluation, these arrays have not been computed yet. To ensure the rest of our ML code runs quickly, lets kick off computation on the cluster by calling persist() on the arrays. Note that there is a dask.persist function that accepts multiple objects rather than calling .persist() individually. This is helpful for objects that share upstream tasks - Dask will avoid re-computing the shared tasks.

%%time
from distributed import wait

X_train, X_test, y_train, y_test = dask.persist(
    X_train, X_test, y_train, y_test,
)
_ = wait(X_train)

Dask ML models

The dask-ml package has parallel implementations of machine learning algorithms that do not have parallel implementations in scikit-learn or other packages. These currently cover linear models and clustering.

from sklearn.pipeline import Pipeline

from dask_ml.linear_model import LinearRegression
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV

lr = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', LinearRegression(penalty='l2', max_iter=100)),
])

Now we are ready to train our model. Before we train, we’ll coerce our testing and training sets from dask.dataframe objects to dask.array objects. We’ll also take this chance to precompute the chunksize of our arrays.

X_train_arr = X_train.to_dask_array(lengths=True)
y_train_arr = y_train.to_dask_array(lengths=True)
X_test_arr = X_test.to_dask_array(lengths=True)
y_test_arr = y_test.to_dask_array(lengths=True)

Let’s train the lr model with X_train_arr and y_train_arr as input.

Note: this will take a few minutes because we are training with a pretty large dataset. You can scale up your cluster if you want it to execute faster! cluster.scale(10)

%%time

lr_fitted = lr.fit(
    X_train_arr,
    y_train_arr,
)

We can evaluate the prediction using RMSE.

from dask_ml.metrics import mean_squared_error

lr_preds = lr_fitted.predict(X_test_arr)
mean_squared_error(y_test_arr, lr_preds, squared=False)

And save it off as a pickle object for use later.

import cloudpickle

with open('/tmp/model.pkl', 'wb') as f:
    cloudpickle.dump(lr_fitted, f)

Incremental learning

Dask ML can hook into scikit-learn’s incremental training features with the Incremental meta-estimator. Any model that implements a partial_fit() method can be utilized with this meta-estimator.