Scheduled Data Pipelines

Scheduled data pipelines run on a cron schedule. They are typically (but do not have to be) prefect flows. They can also be paired with a Dask cluster.

Overview

Prefect is an open source workflow orchestration tool made for data-intensive workloads. In that project’s language, you chain together a series of “tasks” into a “flow”. If you are new to prefect, go read this introduction and then come back to this section.

We’ll cover

  • developing a Prefect flow in a notebook
  • use Dask to speed up a Prefect flow
  • schedule a flow to run on a schedule in a Saturn Deployment

Dependencies

The images we ship with Saturn contain everything you need to get started with Prefect. If you are building your own images, make sure to include the following

  • dask-saturn
  • prefect
  • dask
  • bokeh

dask-saturn is available on public pypi, but must be installed from the saturncloud anaconda.org channel

A Prefect example in the Notebook

https://github.com/saturncloud/examples/blob/main/examples/examples-cpu/prefect/prefect-scheduled-scoring.ipynb

Scheduled Scoring with Prefect on Saturn Cloud

This notebook contains sample code to take a prefect flow and distribute its work with a Dask cluster. The flow below mocks the process of measuring the effectiveness of a deployed statistical model.

Model Details

The data used for this example is the “Incident management process enriched event log” dataset from the UCI Machine Learning Repository.That dataset contains tickets from an IT support system, including characteristics like the priority of the ticket, the time it was opened, and the time it was closed.

This dataset can be used to solve a regression task:

Given the characteristics of a ticket, how long will it be until it is closed?

Define Tasks

prefect refers to a workload as a “flow”, which comprises multiple individual things to do called “tasks”. From the Prefect docs:

A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.

The goal of this notebooks flow is to evaluate, on an ongoing basis, the performance of a model that predicts time-to-close for tickets in an IT support system.

That can be broken down into the following tasks

  • get_trial_id(): assign a unique ID to each run
  • get_ticket_data_batch(): get a random set of newly-closed tickets
  • get_target(): given a batch of tickets, compute how long it took to close them
  • predict(): predict the time-to-close, using the heuristic “higher-priority tickets close faster”
  • evaluate_model(): compute evaluation metrics comparing predicted and actual time-to-close
  • get_trial_summary(): collect all evaluation metrics in one object
  • write_trial_summary(): write trial results somewhere
@task
def get_trial_id() -> str:
    """
    Generate a unique identifier for this trial.
    """
    return str(uuid.uuid4())


@task
def get_ticket_data_batch(batch_size: int) -> pd.DataFrame:
    """
    Simulate the experience of getting a random sample of new tickets
    from an IT system, to test the performance of a model.
    """
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00498/incident_event_log.zip"
    resp = requests.get(url)
    zipfile = ZipFile(BytesIO(resp.content))
    data_file = "incident_event_log.csv"
    # _date_parser has to be a lambda or pandas won't convert dates correctly
    _date_parser = lambda x: pd.NaT if x == '?' else datetime.strptime(x, "%d/%m/%Y %H:%M")
    df = pd.read_csv(
        zipfile.open(data_file),
        parse_dates=[
            "opened_at",
            "resolved_at",
            "closed_at",
            "sys_created_at",
            "sys_updated_at"
        ],
        infer_datetime_format=False,
        converters={
            "opened_at": _date_parser,
            "resolved_at": _date_parser,
            "closed_at": _date_parser,
            "sys_created_at": _date_parser,
            "sys_updated_at": _date_parser
        },
        na_values = ['?']
    )
    df["sys_updated_at"] = pd.to_datetime(df["sys_updated_at"])
    rows_to_score = np.random.randint(0, df.shape[0], 100)
    return(df.iloc[rows_to_score])


@task
def get_target(df):
    """
    Compute time-til-close on a data frame of tickets
    """
    time_til_close = (df['closed_at'] - df['sys_updated_at']) / np.timedelta64(1, 's')
    return time_til_close


@task
def predict(df):
    """
    Given an input data frame, predict how long it will be until the ticket is closed.
    For simplicity, using a super simple model that just says
    "high-priority tickets get closed faster".
    """
    seconds_in_an_hour = 60.0 * 60.0
    preds = df["priority"].map({
        "1 - Critical":   6.0 * seconds_in_an_hour,
        "2 - High":      24.0 * seconds_in_an_hour,
        "3 - Moderate": 120.0 * seconds_in_an_hour,
        "4 - Lower":    240.0 * seconds_in_an_hour,
    })
    default_guess_for_no_priority = 180.0 * seconds_in_an_hour
    preds = preds.fillna(default_guess_for_no_priority)
    return(preds)


@task
def evaluate_model(y_true, y_pred, metric_name: str) -> float:
    metric_func_lookup = {
        "mae": mean_absolute_error,
        "medae": median_absolute_error,
        "mse": mean_squared_error,
        "r2": r2_score
    }
    metric_func = metric_func_lookup[metric_name]
    return metric_func(y_true, y_pred)


@task
def get_trial_summary(trial_id:str, actuals, input_df: pd.DataFrame, metrics: dict) -> dict:
    out = {"id": trial_id}
    out["data"] = {
        "num_obs": input_df.shape[0],
        "metrics": metrics,
        "target": {
            "mean": actuals.mean(),
            "median": actuals.median(),
            "min": actuals.min(),
            "max": actuals.max()
        }
    }
    return out


@task(log_stdout=True)
def write_trial_summary(trial_summary: str):
    """
    Write out a summary of the file. Currently just logs back to the
    Prefect logger
    """
    logger = prefect.context.get("logger")
    logger.info(json.dumps(trial_summary))

Construct a Flow

Now that all of the task logic has been defined, the next step is to compose those tasks into a “flow”. From the Prefect docs:

A Flow is a container for Tasks. It represents an entire workflow or application by describing the dependencies between tasks.

Flows are DAGs, or “directed acyclic graphs.” This is a mathematical way of describing certain organizational principles:

  • A graph is a data structure that uses “edges” to connect “nodes.” Prefect models each Flow as a graph in which Task dependencies are modeled by Edges.
  • A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.
  • An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you’ve seen before.

Because we want this job to run on a schedule, the code below provides one additional argument to Flow(), a special “schedule” object. In this case, the code below says “run this flow every minute”.

with Flow('ticket-model-evaluation', schedule) as flow:
    batch_size = Parameter(
        'batch-size',
        default=1000
    )
    trial_id = get_trial_id()

    # pull sample data
    sample_ticket_df = get_ticket_data_batch(batch_size)

    # compute target
    actuals = get_target(sample_ticket_df)

    # get prediction
    preds = predict(sample_ticket_df)

    # compute evaluation metrics
    mae = evaluate_model(actuals, preds, "mae")
    medae = evaluate_model(actuals, preds, "medae")
    mse = evaluate_model(actuals, preds, "mse")
    r2 = evaluate_model(actuals, preds, "r2")

    # get trial summary in a string
    trial_summary = get_trial_summary(
        trial_id=trial_id,
        input_df=sample_ticket_df,
        actuals=actuals,
        metrics={
            "MAE": mae,
            "MedAE": medae,
            "MSE": mse,
            "R2": r2
        }
    )

    # store trial summary
    trial_complete = write_trial_summary(trial_summary)

If you execute

flow.run()

The flow will execute locally. It’s a good idea to start locally, with a smaller subset of your data before scaling up.

Scaling up with Dask Clusters

LocalCluster

A Dask Local cluster is good for taking advantage of all the cores on your machine. If you only need 512 GB of ram and 64 cores, spinning up a large instance and using Dask local cluster is a great idea.

from prefect.engine.executors import DaskExecutor
flow.run(executor=DaskExecutor())

Multi-Node Cluster

from dask_saturn import SaturnCluster
from prefect.engine.executors import DaskExecutor
flow.run(executor=DaskExecutor(
   cluster_class=SaturnCluster,
   cluster_kwargs=dict(
       scheduler_class="xlarge",
       worker_class="16xlarge",
       nprocs=16,
       ntreads=4,
       n_workers=5
       autoclose=True
   )))

This tells prefect to create a SaturnCluster when it starts the flow, and stop it (autoclose) when it’s complete

Deploy a Prefect flow in Saturn

Notebooks are great for rapid prototyping and interactive work, but since the flow in needs to be run on a schedule, it would be better to run it in an isolated environment that can be started, stopped, and scaled.

Saturn supports persistent and scheduled deployments. Persistent deployments are always on. If I want to run this flow every minute, I would recommend baking the Schedule into the flow, and using a persistent deployment. If I want to run the flow every day, I would recommend removing the schedule from the flow, and using scheduled deployments instead (running it on a Cron schedule)

See here for more details