Loading Data with Dask

Our large dataset for this notebook will be NYC taxi data from all of 2019 Rather than load the data with pandas’ pd.read_csv, we will use Dask’s dd.read_csv method. We’ll also look at how to load messier data.

Set Up Environment

In Saturn, go to the “Projects” page and create a new project called tutorial. Choose the following settings:

  • image: saturncloud/saturn:2020.10.23
  • disk space: 10 Gi
  • size: Medium - 2 cores - 4 GB RAM

Start up that project’s Jupyter, and go into Jupyter Lab.

Set Up Dask cluster

Next, set up your Dask cluster. Saturn Cloud allows you to do this programmatically via dask-saturn.

from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)
client.wait_for_workers(3)

client

Using dd.read_csv

dd.read_csv method accepts glob syntax for loading in multiple files. It also accepts files s3 URLs and all sorts of other paths.

We need to pass a couple of extra arguments to dd.read_csv:

  • storage_options=...: this tells Dask to use anonymous S3 access (we did this with s3.open for pandas)
  • assuming_missing=True: this tells Dask to read all numeric columns as floats. Dask sometimes needs type information up-front to be able to parallelize tasks effectively.
import dask.dataframe as dd

taxi = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)
taxi

If you run the above code, you’ll notice that previewing the taxi object doesn’t print out the contents of the DataFrame, like with pandas. This is because Dask has not yet loaded any data. It does tell us the number of partitions (i.e. little pandas DataFrames) the big Dask DataFrame has.

When you do a calculation on this data, it’ll take some time, because it needs to read the data.

len(taxi)

If you try taxi.shape, though you’ll get some unfamiliar output. This because of Dask’s lazy evaluation - Dask doesn’t perform any operations until asked to. len is a special case that triggers computation. If we want to get the row count of out taxi.shape, we need to run .compute() on the delayed object.

taxi.shape[0].compute()

.compute() returns results immediately. Be careful though, because if you run taxi.compute() Dask will give you the entire big DataFrame as a pandas object (this will certainly blow up the kernel!).

It it useful in many cases to trigger computation on objects even if you don’t want to pull them down to the Jupyter Server. In this case we use .persist(), which triggers all computations performed on the DataFrame and holds the results in memory across the cluster. It becomes useful when we perform later machine learning operations, as we don’t want Dask to be re-parsing CSV files in each iteration of model training.

taxi = taxi.persist()

Messy data - dask.delayed

Data files aren’t always provided in a clean tabular form thats readable with a read_* method from pandas or Dask. With dask.delayed functions, we can write a function that processes a single chunk of raw data and then tell Dask to collect these into a Dask DataFrame. We’ll illustrate that now with the CSV files, but its always better to use a dd.read_* method if your data supports it. When you need it though, dask.delayed is very flexible and powerful - chances are you will use it for some of your workloads.

We’ll define a function, load_csv that will return a pandas DataFrame for a given NYC taxi file path. Then call this for the 2019 files and create a Dask DataFrame with dd.from_delayed.

import pandas as pd
import dask
import dask.dataframe as dd

@dask.delayed
def load_csv(file):
    df = pd.read_csv(
        s3.open(file, mode='rb'),
        parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
    )
    return df

dfs = []
for f in s3.glob('s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'):
    df = load_csv(f)
    dfs.append(df)
taxi_delayed = dd.from_delayed(dfs)