Predict Over Large Datasets

We can’t load a large dataset into one pandas DataFrame, if we needed to predict over a large dataset we could batch it and collect the results. It’s easier to use the same pandas interface with a dask.DataFrame.

Set Up Environment

In Saturn, go to the “Projects” page and create a new project called tutorial. Choose the following settings:

  • image: saturncloud/saturn:2020.10.23
  • disk space: 10 Gi
  • size: Medium - 2 cores - 4 GB RAM

Start up that project’s Jupyter, and go into Jupyter Lab.

Overview of map_partitions

The map_partitions method allows execution of arbitrary functions on the partitions of the Dask DataFrame. Remember these partitions are just pandas DataFrames, so any code that works with pandas works here! This enables us to execute a function that performs predictions with a pre-trained model.

First lets get a handle on how to use the map_partitions function with a toy example.

from dask.datasets import timeseries

ts = timeseries()
ts.head()

In this dataset, each partition corresponds to one day and there is one row for every second. We can write a function that creates a new column z that is the product of x and y. We’ll run it on the first partition to make sure it works how we expect.

ts_part = ts.partitions[0].compute()

def myfunc(df):
    df['z'] = df['x'] * df['y']
    return df['z']

myfunc(ts_part)

Next we can run that function on our larger dask.dataframe using map_partitions

out = ts.map_partitions(myfunc)

Dask will attempt to infer the data type of the function used with map_partitions. To be more explict, you should pass a meta= argument describing the data type of the output.

out = ts.map_partitions(
    myfunc,
    meta={'z': 'float64'}
)

Predict on Previously Trained Model

Now let’s use map_partitions to make predictions from a previously trained model. We’ll load the model that was trained with scikit-learn and saved in the training tutorial.

import cloudpickle
model = cloudpickle.load(open('/tmp/model.pkl', 'rb'))

Set Up Dask cluster

Next, set up your Dask cluster. Saturn Cloud allows you to do this programmatically via dask-saturn.

from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)
client.wait_for_workers(3)

client

Load data

We can load the data straight from s3 using dd.read_csv.

import dask.dataframe as dd

taxi = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)
taxi

Feature Engineering

Notice that this feature engineering code is exactly the same as what we do in pandas. Dask’ DataFrame API matches pandas’ API in many places. Check out the Dask DataFrame docs for more information on what is and is not supported from the pandas API.

# specify feature and label column names
raw_features = [
    'tpep_pickup_datetime',
    'passenger_count',
    'tip_amount',
    'fare_amount',
]
features = [
    'pickup_weekday',
    'pickup_weekofyear',
    'pickup_hour',
    'pickup_week_hour',
    'pickup_minute',
    'passenger_count',
]
label = 'tip_fraction'

def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.isocalendar().day
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df

taxi_feat = prep_df(taxi)
taxi_feat.head()

Prediction

First we’ll write a function that uses the model to make a prediction for a given input DataFrame, then execute it with map_partitions across the entire taxi_feat DataFrame.

The output of the function should be a pd.Series object that has predictions for each row in the input DataFrame. We’ll validate that the function works properly by executing it with taxi_feat_part as input before trying it with map_partitions. The output should look something like:

0         0.164296
1         0.166451
            ...
717799    0.165269
717800    0.168916
Length: 717801, dtype: float64
import pandas as pd

taxi_feat_part = taxi_feat.partitions[0].compute()

def predict(df):
    preds = model.predict(df[features])
    return pd.Series(preds)

predict(taxi_feat_part)

Once we’ve seen that the function works on one partitions (one pandas dataframe) then we can confidently call map_partitions to run it on the whole dask dataframe.

preds_dask = taxi_feat.map_partitions(
    predict,
    meta=pd.Series(dtype='float64'),
)

We can check the RMSE on those predictions using Dask ML

from dask_ml.metrics import mean_squared_error

mean_squared_error(
    taxi_feat[label].values,
    preds_dask.values,
    squared=False,
)

ParallelPostFit wrapper

Dask ML also has a ParallelPostFit meta-estimator the wraps a scikit-learn model for parallelized predictions. This is useful in scenarios where it is known up-front that a model needs to be trained on a small amount of data but predictions need to be made for a large amount of data.

from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler

from dask_ml.wrappers import ParallelPostFit
from dask_ml.metrics import mean_squared_error

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

ppf = ParallelPostFit(estimator=pipeline)
ppf_fitted = ppf.fit(taxi_feat_part[features], taxi_feat_part[label])
preds_dask = ppf_fitted.predict(taxi_feat[features])

mean_squared_error(
    taxi_feat[label].values,
    preds_dask,
    squared=False,
)