Connect to Snowflake

Connecting to Snowflake from Saturn

Connecting to Snowflake

Snowflake is a data platform built for the cloud that allows for fast SQL queries. This example shows how to query data in Snowflake and pull into Saturn Cloud for data science work. We will rely on the Snowflake Connector for Python to connect and issue queries from Python code.

Packages

The images that come with Saturn come with the Snowflake Connector for Python installed. If you are building your own images and want to work with Snowflake, you should include snowflake-connector-python in your environment.

Credentials

To avoid setting and storing credentials directly in notebooks, we recommend sourcing credentials from environment variables using the Credentials feature in Saturn Cloud.

Create a credential for each of these variables:

  • SNOWFLAKE_ACCOUNT
  • SNOWFLAKE_USER
  • SNOWFLAKE_PASSWORD

You will need to restart any Jupyter Server or Dask Clusters if you add credentials while they are running. Then from a notebook where you want to connect to Snowflake, you can read in the credentials and then provide additional arguments if necessary:

import os
import snowflake.connector

conn = snowflake.connector.connect(
    account=os.environ['SNOWFLAKE_ACCOUNT'],
    user=os.environ['SNOWFLAKE_USER'],
    password=os.environ['SNOWFLAKE_PASSWORD'],
    warehouse='MY_WAREHOUSE',
    database='MY_DATABASE',
    schema='MY_SCHEMA',
)

Snowflake Connector with Pandas

The snowflake-connector-python package has fetch_pandas_all() and fetch_pandas_batches() methods that utilize Arrow for fast data exchange. Using sample data available in Snowflake, we’ll load some data into a pandas DataFrame.

import os
import snowflake.connector

conn = snowflake.connector.connect(
    account=os.environ['SNOWFLAKE_ACCOUNT'],
    user=os.environ['SNOWFLAKE_USER'],
    password=os.environ['SNOWFLAKE_PASSWORD'],
    database='SNOWFLAKE_SAMPLE_DATA',
    schema='TPCH_SF1',
)

query = """
SELECT *
FROM customer
WHERE c_custkey <= 20000
"""
cur = conn.cursor().execute(query)
df = cur.fetch_pandas_all()

Snowflake Connector with Dask

When data sizes exceed what can fit into a single pandas DataFrame, the snowflake-connector-python package can also be used to read larger datasets into Dask DataFrames. This is accomplished by telling Dask to load different pieces of your data in parallel.

We need to set up a query template containing a binding that will result in Dask issuing multiple queries that each extract a slice of the data based on a partitioning column. These slices will become the partitions in a Dask DataFrame.

query = """
SELECT *
FROM customer
WHERE c_custkey BETWEEN %s AND %s
"""

It’s important to pick a column that evenly divides your data, like a row ID or a uniformly distributed timestamp. Otherwise one query may take much longer to execute than the others. We then use a dask.delayed function to execute this query multiple times in parallel for each partition. Note that we put our Snowflake connection information in a dict called conn_info to be able to reference it multiple times.

import dask

conn_info = {
    "account": os.environ['SNOWFLAKE_ACCOUNT'],
    "user": os.environ['SNOWFLAKE_USER'],
    "password": os.environ['SNOWFLAKE_PASSWORD'],
    "database": 'SNOWFLAKE_SAMPLE_DATA',
    "schema": 'TPCH_SF1',
}

@dask.delayed
def load(conn_info, query, start, end):
    with snowflake.connector.connect(**conn_info) as conn:
        cur = conn.cursor().execute(query, (start, end))
        return cur.fetch_pandas_all()

We can now call this load() function multiple times and convert the results into a Dask DataFrame using dask.dataframe.from_delayed().

import dask.dataframe as dd

results = [
    load(conn_info, query, 0, 10000),
    load(conn_info, query, 10001, 20000),
    load(conn_info, query, 20001, 30000),
]
ddf = dd.from_delayed(results)

The start and end values were hard-coded for the above example, but you would normally write a query to determine what the partitions will look like based on your data. For example, with our customer table, we know that the c_custkey coumn is an auto-incrementing, non-null ID column (the cardinality of the column is equal to the number of rows in the table). We can write a function that will determine the appropriate start and end values given a desired number of partitions, then use those results to create the Dask DataFrame:

def get_partitions(table, id_col, num_partitions=100):
    with snowflake.connector.connect(**conn_info) as conn:
        part_query = f"SELECT MAX({id_col}) from {table}"
        part_max = conn.cursor().execute(part_query).fetchall()[0][0]

        inc = part_max // num_partitions
        parts = [(i, i + inc - 1) for i in range(0, part_max, inc)]
        return parts

parts = get_partitions('customer', 'c_custkey')

ddf = dd.from_delayed(
    [load(conn_info, query, part[0], part[1]) for part in parts]
)

As long as the full dataset fits into the memory of your cluster, you can persist the DataFrame to ensure the Snowflake queries only execute once. Otherwise, they will execute each time you trigger computation on the DataFrame.

ddf = ddf.persist()

To see another example of loading Snowflake data into Dask, check out the “examples-cpu” project in your Saturn account. The examples-cpu/snowflake/snowflake-dask.ipynb notebook shows an example of partitioning based on a timestamp column rather than a row ID.

Snowflake Connector with Dask and RAPIDS

To load data into a RAPIDS cudf DataFrame, follow the same process as above for Dask but then convert the pandas Dataframe into a cudf DataFrame in the load() function. For this, use thedask_cudf package instead of dask.dataframe.

import cudf
import dask_cudf as cudd

@dask.delayed
def load(conn_info, query, start, end):
    with snowflake.connector.connect(**conn_info) as conn:
        cur = conn.cursor().execute(query, (start, end))
        df = cur.fetch_pandas_all()
        return cudf.from_pandas(df)

cuddf = cudd.from_delayed(
    [load(conn_info, query, part[0], part[1]) for part in parts]
)