Prefect Cloud Concepts

Learn more about the available orchestration options with Prefect Cloud

Overview

This page describes the integration between Saturn Cloud and Prefect Cloud. For a step-by-step tutorial using this integration, see “Using Prefect Cloud with Saturn Cloud”.

The same team that maintains the prefect core library runs a cloud service called Prefect Cloud. Prefect Cloud is a hosted, high-availability, fault-tolerant service that handles all the orchestration responsibilities for running data pipelines.

Prefect Orchestration Concepts

Before continuing, it’s important to have a high-level understanding of the key components in Prefect Cloud. For much, much more detail on these topics, see the Prefect “Orchestration” docs.

Flows

A “flow” is a container for multiple tasks which understands the relationship between those tasks. Tasks are arranged in a directed acyclic graph (DAG), which just means that there is always a notion of which tasks depend on which other tasks, and those dependencies are never circular.

For more, see the prefect docs.

Flow Runs

Each time the code in a flow is executed, that represents one “flow run”.

Prefect Core Server

A service that keeps track of all your flows and knows how to run them. This server also has responsibility for keeping track of schedules. If you set up a flow to run once an hour, Prefect Core Server will make sure that that happens.

Flow Versions

When a flow changes, a new “flow version” is created. Example changes include:

  • some tasks have been added or removed
  • the dependencies between tasks have changed
  • the flow is using a different execution mechanism (like a Dask cluster instead of a local Python process)

Prefect Core Server keeps track of all these versions, and knows to link all versions of the same flow into one “flow group”.

Prefect Agents

A Prefect Agent is a small service responsible for running flows and reporting their logs and statuses back to Prefect Core Server. Prefect Agents are always “pull-based”…they are configured to point at an instance of Prefect Core Server, and every few milliseconds they ask Prefect Core Server hey is there anything you want me to do? hey is there anything you want me to do?.

When Prefect Core Server responds and says “yes, please run this flow”, the agent is responsible for inspecting the details of the flow and then kicking off a flow run.

It looks at these details:

  • storage: where can the flow code be retrieved from?
    • In most cases, “the flow” means a binary file which can be turned into a Python object (prefect.Flow) using cloudpickle
  • run config: what infrastructure needs to be set up to run the flow?
  • executor: what engine will be used to run all the Python code in the flow?

Saturn Cloud + Prefect Cloud Architecture

Using Saturn Cloud and Prefect Cloud together looks like this:

  1. Using credentials from your Prefect Cloud account, you create an Agent running in Saturn Cloud.
    • NOTE: Saturn does not charge you for this.
  2. You create a Saturn Cloud “project” which defines all the dependencies your code needs.
  3. In a Jupyter server with all those dependencies set up, you write flow code in Python using the prefect library
  4. In your Python code, you use the prefect-saturn library to “register” your flow with Saturn, and the prefect library to register it with Prefect Cloud. Your flow will be automatically labeled to match with Prefect agents running in your Saturn cluster.
  5. prefect-saturn adds the following features to your flow by default:
    • storage: Webhook
    • run config: KubernetesRun
    • executor: DaskExecutor, using a prefect_saturn.SaturnCluster
    • labels: saturn-cloud, webhook-flow-storage, <YOUR_CLUSTER_DOMAIN>
  6. When Prefect Cloud tells your Prefect Agent in Saturn to run the flow, Saturn creates a kubernetes job to run the flow.

Features of this Design

  1. All flow runs for one flow run in the same Dask cluster
  2. flow runs from different flows run in their own Dask clusters
  3. All of your sensitive data, code, and credentials stays within Saturn…Prefect Cloud only gets a minimal description of the flow without any sensitive information

Division of Responsibilities

In using this integration, you’ll write code with the prefect library which talks to Saturn Cloud and Prefect Cloud. Their responsibilities are as follows:

  • prefect library
    • describe the work to be done in a flow
    • tell Prefect Cloud about the flow, including when to run it (on a schedule? on demand?)
    • store that flow somewhere so it can be retrieved and run later
  • Saturn Cloud
    • provide a hosted Jupyter Lab experience where you can author and test flows, and a library for easily deploying them (prefect-saturn
    • run an Agent that checks Prefect Cloud for new work
    • when Prefect Cloud says “yes run something”, retrieve flows from storage and run them
    • automatically start up a flow execution environment (a single node or a distributed Dask cluster) to run your flow, with the following guarantees:
      • is the size you asked for
      • has a GPU your code can take advantage of (if you requested one)
      • has the exact same environment as the Jupyter notebook where you wrote your code
      • has all of the code for your project (like other libraries you wrote)
      • has all of the credentials and secrets you’ve added (like AWS credentials or SSH keys)
    • display logs in the Saturn Cloud UI
    • send logs and task statuses back to Prefect Cloud, so you have all the information you need to react if anything breaks
  • Prefect Cloud
    • keep track of all the flows you’ve registered
    • when it’s time to run those flows (either on demand or based on a schedule), tell Agents to run them
    • display a history of all flow runs, including success / failure of individual tasks and logs from all tasks
    • allow you to kick off a flow on-demand using a CLI, Python library, or clicking buttons in the UI

Customizing Flow Orchestration

This section describes the different supported orchestration options for Prefect Cloud flows. You might find that you’re able to improve the performance or reliability of flow runs by customizing these options.

where prefect tasks run tasks can access a multi-node Dask cluster
DaskExecutor with a Saturn Dask Cluster Saturn Dask cluster yes
DaskExecutor with a Dask LocalCluster local Dask cluster no
LocalExecutor, ResourceManager with a Saturn Dask Cluster local Python process yes
LocalExecutor without any Dask local Python process no

The examples below assume you’ve already created a simple Prefect flow and stored it in a variable flow and that you’ve already created a Prefect Cloud project called “test-project”.

from prefect import Flow, Task

PREFECT_CLOUD_PROJECT_NAME = "test-project"

@task
def multiply(x, y):
    return x * y

@task
def subtract(x, y):
    return x - y

with Flow("test-flow") as flow:
    a = multiply(10, 4.5)
    b = subtract(a, 2.8)

NOTE: When options below talk about the cost of startup time for Dask clusters, those are worst-case expected startup times. You can eliminate this cost by starting Saturn Dask clusters in the UI before running a flow. See the FAQs for details.

DaskExecutor with a Saturn Dask Cluster

This is the default option when using prefect-saturn. In this setup, all tasks in your flow will be converted to Dask tasks and submitted to the same Saturn Cloud Dask cluster.

When you should use this

  • if your flow would benefit from parallelization
  • if your flow needs to work with large datasets
  • if up to 15 minutes minutes of setup time at the beginning of flow execution is acceptable

How to use this

from prefect_saturn import PrefectCloudIntegration

# tell Saturn Cloud about the flow
integration = PrefectCloudIntegration(
    prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME
)
flow = integration.register_flow_with_saturn(
    flow=flow,
    dask_cluster_kwargs={
        "n_workers": 3,
        "worker_size": "xlarge"
    }
)

# tell Prefect Cloud about the flow
flow.register()

DaskExecutor with a Dask LocalCluster

If your flow would benefit from parallelism but doesn’t need the full power of a distributed Dask cluster, you might want to use a LocalCluster instead. Flows runs using this configuration will start executing sooner than those that require a distributed Dask cluster.

When you should use this

  • if your flow would benefit from parallelization
  • if your flow does not need to work with large datasets
  • if up to 5 seconds of setup time at the beginning of flow execution is acceptable

How to use this

from distributed import LocalCluster
from prefect.executors import DaskExecutor
from prefect_saturn import PrefectCloudIntegration

# tell Saturn Cloud about the flow
integration = PrefectCloudIntegration(
    prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME
)
flow = integration.register_flow_with_saturn(flow=flow)

# override the executor chosen by prefect-saturn
flow.executor = DaskExecutor(
    cluster_class=LocalCluster,
    cluster_kwargs={
        "n_workers": 3,
        "threads_per_worker": 2
    }
)

# tell Prefect Cloud about the flow
flow.register()

LocalExecutor, with a ResourceManager creating a Saturn Dask Cluster

If your flow code uses Dask operations directly, such as creating Dask DataFrames or training models with dask-ml, you may want to use a prefect ResourceManager. In this setup, flow tasks execute in the main process in the flow run job, but tasks are able to submit work to a distributed Dask cluster.

When you should use this

  • if your flow would benefit from parallelization
  • if your flow needs to work with large datasets
  • if your flow code uses Dask directly (e.g. you have tasks that manipulate Dask DataFrames)
  • if up to 15 minutes of setup time at the beginning of flow execution is acceptable

How to use this

Prefect’s ResourceManager is technically a special type of task within a flow, so to use this option you need to set up the resource manager when creating the flow, as shown below.

import dask.array as da
import prefect

from dask_saturn import SaturnCluster
from prefect import Flow, Task, resource_manager
from prefect.executors import LocalExecutor
from prefect_saturn import PrefectCloudIntegration


PREFECT_CLOUD_PROJECT_NAME = "test-project"


@resource_manager
class _DaskCluster:
    def __init__(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        cluster = SaturnCluster(n_workers=self.n_workers)
        client = Client(cluster)

    def cleanup(self, x=None):
        pass


@task
def multiply(x, y):
    return x * y


@task
def subtract(x, y):
    return x - y


@task
def do_some_dask_stuff() -> None:
    logger = prefect.context.get("logger")
    X = da.random.random((100, 10))
    logger.info(f"mean of Dask array X is {X.mean().compute()}")


with Flow("test-flow") as flow:
    with _DaskCluster(n_workers=3) as resource:
        a = multiply(10, 4.5)
        b = subtract(a, 2.8)
        do_some_dask_stuff()


# tell Saturn Cloud about the flow
integration = PrefectCloudIntegration(
    prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME
)
flow = integration.register_flow_with_saturn(flow=flow)

# override the executor chosen by prefect-saturn
flow.executor = LocalExecutor()

# tell Prefect Cloud about the flow
flow.register()

LocalExecutor without any Dask

If your flow wouldn’t benefit much from parallelization, you aren’t getting much benefit from using Dask. In this situation, you can reduce the total runtime of flow runs by using a LocalExecutor.

When you should use this

  • if your flow would not benefit from parallelization
  • if your flow does not need to work with large datasets
  • if flow runs need to start executing within 5 seconds of a flow run being triggered by Prefect Cloud

How to use this

from distributed import LocalCluster
from prefect_saturn import PrefectCloudIntegration

# tell Saturn Cloud about the flow
integration = PrefectCloudIntegration(
    prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME
)
flow = integration.register_flow_with_saturn(flow=flow)

# override the executor chosen by prefect-saturn
flow.executor = LocalExecutor()

# tell Prefect Cloud about the flow
flow.register()

Frequently Asked Questions

Is the Dask cluster shut down after a flow run completes?

If you use the default orchestration setup from prefect-saturn, the Saturn Dask cluster that Prefect Cloud flows run on is not stopped after the flow run completes.

Since all flow runs from the same flow run on the same cluster, there is a risk that one flow run might complete and stop a cluster that is still in use by another run of the same flow.

If you are confident that you won’t have overlapping runs of the same flow, you can tell prefect-saturn to stop the cluster after a flow completes by passing "autoclose": True into prefect_saturn.PrefectCloudIntegration.register_flow_with_saturn(). See the prefect-saturn docs for more details.

Why does it sometimes take a few minutes for my flow to start executing?

If you use an orchestration option that involves a Saturn Dask cluster and that Dask cluster isn’t currently running when the flow run starts, Saturn will start the cluster automatically for you. This can take a few minutes.

Startup times vary based on factors like:

  • how many workers you’ve selected and the size of those workers
  • the size of the container image used for workers, and whether or not that image is already cached on the node(s) workers get scheduled onto
  • time to complete any custom steps you’ve set up in the project’s start script

You can avoid this added setup time by starting the flow’s Dask cluster directly from the Saturn UI before any flow runs start. You should also find that by default, the cluster will be left up after a flow run completes. So if you trigger two runs of the same flow in quick succession, the second flow run should be faster and shouldn’t have to wait for the Dask cluster to start.




Need help, or have more questions? Contact us at: We'll be happy to help you and answer your questions!