Snowflake is the most popular data warehouse among our Saturn users. This article will cover efficient ways to load Snowflake data into Dask so you can do non-sql operations (think machine learning) at scale.
First, some basics, the standard way to load Snowflake data into Pandas:
import snowflake.connector import pandas as pd ctx = snowflake.connector.connect( user='YOUR_USER', password='YOUR_PASSWORD', account='YOUR_ACCOUNT' ) query = "SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER" pd.read_sql(query, ctx)
Snowflake recently introduced a much faster method for this operation,
fetch_pandas_batches which leverages Arrow
cur = ctx.cursor() cur.execute(query) df = cur.fetch_pandas_all()
fetch_pandas_batchesreturns an iterator, but since we’re going to focus on loading this into a distributed dataframe (pulling from multiple machines), we’re going to setup our query to shard the data, and use
fetch_pandas_allon our workers.
What is Snowflake good for?
It can be very tempting to rip all the data out of snowflake so that you can work with it in Dask. That definitely works, however snowflake is going to be much faster at applying sql-like operations to the data. Snowflake stores the data and has highly optimized routines to get every ounce of performance out of your query. These examples will pretend like we’re loading the entire data into Dask – in your case, you will probably have some SQL query, which performs the sql-like transformations you care about, and you’ll be loading the result set into Dask, for the things that Dask is good at (possibly some types of feature engineering, and machine learning). Saturn Cloud has native integration with Dask and Snowflake, so check that out if this is interesting to you.
How does Dask load data?
You can think about a Dask dataframe as a giant Pandas dataframe, that has been chopped up an scattered across a bunch of computers. When we are loading data from Snowflake (assuming that the data is large), it’s not efficient to load all the data on one machine, and then scatter that out to your cluster. We are going to focus on having all machines in your Dask cluster load a partition (a small slice) of your data.
We need a way to split the data into little partitions so that we can load it into the cluster. Data in SQL doesn’t necessarily have any natural ordering. You can’t just say that you’re going to throw the first 10k rows into one partition, and the second 10k rows into another partition. That partitioning has to be based on a column of the data. For example you can partition the data by a date field. Or you can create a row number by adding an
identity column into your Snowflake table.
Once you’ve decided what column you want to partition your data on, it’s important to setup data clustering on the snowflake side. Every single worker is going to ask for a small slice of the data. Something like
select * from table where id < 20000 and id >= 10000
We aren’t going to use
read_sql_table from the dask library here. I prefer to have more control over how we load the data from Snowflake, and we want to call
fetch_pandas_all, which is a Snowflake specific function, and therefore not supported with
import snowflake.connector from dask.dataframe import from_delayed from dask.distributed import delayed def load(connection_info, query, start, end): conn = snowflake.connector.connect(**connection_info) cur = conn.cursor() cur.execute(query, start, end) return cur.fetch_pandas_all() ddf = from_delayed(*[load(connection_info, query, st, ed) for st, ed in partitions]) ddf.persist()
This code assumes that partitions is a list of starting/ending partitions, for example
partitions = [(0, 10000), (10000, 20000), ...]
delayed is a decorator that turns a Python function into a function suitable for running on the dask cluster. When you execute it, instead of executing, it returns a
delayed result that represents what the return value of the function will be.
from_delayed takes a list of these
delayed objects, and concatenates them into a giant dataframe.
This is advanced concepts, but I urge you to read this section, it can save you alot of time and headache from running out of memory on your workstation. Don’t assume that just because Snowflake says that a dataset is 20GB that it will be 20GB when you load it into
pandas in memory representation is always much larger, though you can get better by being good about data types.
Diagnosing Memory Usage
df.memory_usage(deep=True) is a good way to understand how much memory each column is using. This can help you understand what benefit you are getting from converting your data to appropriate DTypes.
Python strings have roughly 40 bytes of overhead. That doesn’t sound like a lot, but if you have a billion strings, it can add up quickly. The new StringDType can help here.
df['column'] = df['column'].astype(pd.StringDType())
Many string and numerical fields are actually categorical. Take a column named “Household Income”. Instead of a numerical value, you usually get data in bands, like “0-$40,000” or “more than $100,000”.
As a general guideline, I usually look for columns where the ratio of the number of unique values, to the number of rows is less than 1%.
In pandas this is the relevant code.
df['column'] = df['column'].astype("category")
However, I’m assuming that loading the entire column out of snowflake in order to compute the categorical dtype is not feasible. I recommend the following type of query to identify which columns are good candidates for categorical dtypes
select count(distinct(col1)), count(distinct(col2)), ... from table
You can compare that result to the number of rows in your table in order to figure out which columns should be categorical.
Then to figure out the unique values
select distinct(col1) from table
Putting it all together
Assuming you’ve done some of the memory optimizations listed above, and you’ve figured out some fields that should be converted to StringDType, some that should be categoricals. And assuming that you have a dictionary called
dtypes which is a mapping of column names, to the dtype you wish to coerce the result to.
import snowflake.connector from dask.dataframe import from_delayed from dask.distributed import delayed def load(connection_info, query, start, end, dtypes): conn = snowflake.connector.connect(**connection_info) cur = conn.cursor() cur.execute(query, start, end) return cur.fetch_pandas_all().astype(dtypes) ddf = from_delayed(*[load(connection_info, query, st, ed) for st, ed in partitions]) ddf.persist()