Snowflake is the most popular data warehouse among our Saturn users. This article will cover efficient ways to load Snowflake data into Dask so you can do non-sql operations (think machine learning) at scale.

The Basics

First, some basics, the standard way to load Snowflake data into Pandas:

import snowflake.connector
import pandas as pd

ctx = snowflake.connector.connect(
    user='YOUR_USER',
    password='YOUR_PASSWORD',
    account='YOUR_ACCOUNT'
)
query = "SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER"
pd.read_sql(query, ctx)

Snowflake recently introduced a much faster method for this operation, fetch_pandas_all, and fetch_pandas_batches which leverages Arrow

cur = ctx.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()

fetch_pandas_batches returns an iterator, but since we’re going to focus on loading this into a distributed dataframe (pulling from multiple machines), we’re going to setup our query to shard the data, and use fetch_pandas_all on our workers.

What is Snowflake good for?

It can be very tempting to rip all the data out of snowflake so that you can work with it in Dask. That definitely works, however snowflake is going to be much faster at applying sql-like operations to the data. Snowflake stores the data and has highly optimized routines to get every ounce of performance out of your query. These examples will pretend like we’re loading the entire data into Dask – in your case, you will probably have some SQL query, which performs the sql-like transformations you care about, and you’ll be loading the result set into Dask, for the things that Dask is good at (possibly some types of feature engineering, and machine learning). Saturn Cloud has native integration with Dask and Snowflake, so check that out if this is interesting to you.

How does Dask load data?

You can think about a Dask dataframe as a giant Pandas dataframe, that has been chopped up an scattered across a bunch of computers. When we are loading data from Snowflake (assuming that the data is large), it’s not efficient to load all the data on one machine, and then scatter that out to your cluster. We are going to focus on having all machines in your Dask cluster load a partition (a small slice) of your data.

Data Clustering

We need a way to split the data into little partitions so that we can load it into the cluster. Data in SQL doesn’t necessarily have any natural ordering. You can’t just say that you’re going to throw the first 10k rows into one partition, and the second 10k rows into another partition. That partitioning has to be based on a column of the data. For example you can partition the data by a date field. Or you can create a row number by adding an identity column into your Snowflake table.

Once you’ve decided what column you want to partition your data on, it’s important to setup data clustering on the snowflake side. Every single worker is going to ask for a small slice of the data. Something like

select * from table where id < 20000 and id >= 10000
If you don’t setup data clustering, every single query is going to trigger a full table scan on the resulting database (I’m probably overstating the problem, but without data clustering, performance here can be quite bad)

Load it!

We aren’t going to use read_sql_table from the dask library here. I prefer to have more control over how we load the data from Snowflake, and we want to call fetch_pandas_all, which is a Snowflake specific function, and therefore not supported with read_sql_table

import snowflake.connector
from dask.dataframe import from_delayed
from dask.distributed import delayed


@delayed
def load(connection_info, query, start, end):
    conn = snowflake.connector.connect(**connection_info)
    cur = conn.cursor()
    cur.execute(query, start, end)
    return cur.fetch_pandas_all()


ddf = from_delayed(*[load(connection_info, query, st, ed) for st, ed in partitions])
ddf.persist()

This code assumes that partitions is a list of starting/ending partitions, for example

partitions = [(0, 10000), (10000, 20000), ...]

delayed is a decorator that turns a Python function into a function suitable for running on the dask cluster. When you execute it, instead of executing, it returns a delayed result that represents what the return value of the function will be. from_delayed takes a list of these delayed objects, and concatenates them into a giant dataframe.

Memory Optimizations

This is advanced concepts, but I urge you to read this section, it can save you alot of time and headache from running out of memory on your workstation. Don’t assume that just because Snowflake says that a dataset is 20GB that it will be 20GB when you load it into pandas. The pandas in memory representation is always much larger, though you can get better by being good about data types.

Diagnosing Memory Usage

df.memory_usage(deep=True) is a good way to understand how much memory each column is using. This can help you understand what benefit you are getting from converting your data to appropriate DTypes.

StringDType

Python strings have roughly 40 bytes of overhead. That doesn’t sound like a lot, but if you have a billion strings, it can add up quickly. The new StringDType can help here.

df['column'] = df['column'].astype(pd.StringDType())

Categorical DTypes

Many string and numerical fields are actually categorical. Take a column named “Household Income”. Instead of a numerical value, you usually get data in bands, like “0-$40,000” or “more than $100,000”.

As a general guideline, I usually look for columns where the ratio of the number of unique values, to the number of rows is less than 1%.

In pandas this is the relevant code.

df['column'] = df['column'].astype("category")

However, I’m assuming that loading the entire column out of snowflake in order to compute the categorical dtype is not feasible. I recommend the following type of query to identify which columns are good candidates for categorical dtypes

select 
  count(distinct(col1)),
  count(distinct(col2)),
  ...
 from table

You can compare that result to the number of rows in your table in order to figure out which columns should be categorical.

Then to figure out the unique values

select distinct(col1) from table

Putting it all together

Assuming you’ve done some of the memory optimizations listed above, and you’ve figured out some fields that should be converted to StringDType, some that should be categoricals. And assuming that you have a dictionary called dtypes which is a mapping of column names, to the dtype you wish to coerce the result to.

import snowflake.connector
from dask.dataframe import from_delayed
from dask.distributed import delayed


@delayed
def load(connection_info, query, start, end, dtypes):
    conn = snowflake.connector.connect(**connection_info)
    cur = conn.cursor()
    cur.execute(query, start, end)
    return cur.fetch_pandas_all().astype(dtypes)


ddf = from_delayed(*[load(connection_info, query, st, ed) for st, ed in partitions])
ddf.persist()

Thanks for reading. If you’re interested in using Dask with Snowflake, then I recommend you check out Saturn Cloud.