Scheduled Data Pipelines
Overview
Prefect is an open source workflow orchestration tool made for data-intensive workloads. In that project’s language, you chain together a series of “tasks” into a “flow”. If you are new to prefect, go read this introduction and then come back to this section.
We’ll cover
- developing a Prefect flow in a notebook
- use Dask to speed up a Prefect flow
- schedule a flow to run on a schedule in a Saturn Deployment
Dependencies
The images we ship with Saturn contain everything you need to get started with Prefect. If you are building your own images, make sure to include the following
- dask-saturn
- prefect
- dask
- bokeh
dask-saturn
is available on public pypi, but must be installed from the saturncloud
anaconda.org channel
A Prefect example in the Notebook
Scheduled Scoring with Prefect on Saturn Cloud
This notebook contains sample code to take a prefect flow and distribute its work with a Dask cluster. The flow below mocks the process of measuring the effectiveness of a deployed statistical model.
Model Details
The data used for this example is the “Incident management process enriched event log” dataset from the UCI Machine Learning Repository.That dataset contains tickets from an IT support system, including characteristics like the priority of the ticket, the time it was opened, and the time it was closed.
This dataset can be used to solve a regression task:
Given the characteristics of a ticket, how long will it be until it is closed?
Define Tasks
prefect refers to a workload as a “flow”, which comprises multiple individual things to do called “tasks”. From the Prefect docs:
A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.
The goal of this notebooks flow is to evaluate, on an ongoing basis, the performance of a model that predicts time-to-close for tickets in an IT support system.
That can be broken down into the following tasks
get_trial_id()
: assign a unique ID to each runget_ticket_data_batch()
: get a random set of newly-closed ticketsget_target()
: given a batch of tickets, compute how long it took to close thempredict()
: predict the time-to-close, using the heuristic “higher-priority tickets close faster”evaluate_model()
: compute evaluation metrics comparing predicted and actual time-to-closeget_trial_summary()
: collect all evaluation metrics in one objectwrite_trial_summary()
: write trial results somewhere
@task
def get_trial_id() -> str:
"""
Generate a unique identifier for this trial.
"""
return str(uuid.uuid4())
@task
def get_ticket_data_batch(batch_size: int) -> pd.DataFrame:
"""
Simulate the experience of getting a random sample of new tickets
from an IT system, to test the performance of a model.
"""
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00498/incident_event_log.zip"
resp = requests.get(url)
zipfile = ZipFile(BytesIO(resp.content))
data_file = "incident_event_log.csv"
# _date_parser has to be a lambda or pandas won't convert dates correctly
_date_parser = lambda x: pd.NaT if x == '?' else datetime.strptime(x, "%d/%m/%Y %H:%M")
df = pd.read_csv(
zipfile.open(data_file),
parse_dates=[
"opened_at",
"resolved_at",
"closed_at",
"sys_created_at",
"sys_updated_at"
],
infer_datetime_format=False,
converters={
"opened_at": _date_parser,
"resolved_at": _date_parser,
"closed_at": _date_parser,
"sys_created_at": _date_parser,
"sys_updated_at": _date_parser
},
na_values = ['?']
)
df["sys_updated_at"] = pd.to_datetime(df["sys_updated_at"])
rows_to_score = np.random.randint(0, df.shape[0], 100)
return(df.iloc[rows_to_score])
@task
def get_target(df):
"""
Compute time-til-close on a data frame of tickets
"""
time_til_close = (df['closed_at'] - df['sys_updated_at']) / np.timedelta64(1, 's')
return time_til_close
@task
def predict(df):
"""
Given an input data frame, predict how long it will be until the ticket is closed.
For simplicity, using a super simple model that just says
"high-priority tickets get closed faster".
"""
seconds_in_an_hour = 60.0 * 60.0
preds = df["priority"].map({
"1 - Critical": 6.0 * seconds_in_an_hour,
"2 - High": 24.0 * seconds_in_an_hour,
"3 - Moderate": 120.0 * seconds_in_an_hour,
"4 - Lower": 240.0 * seconds_in_an_hour,
})
default_guess_for_no_priority = 180.0 * seconds_in_an_hour
preds = preds.fillna(default_guess_for_no_priority)
return(preds)
@task
def evaluate_model(y_true, y_pred, metric_name: str) -> float:
metric_func_lookup = {
"mae": mean_absolute_error,
"medae": median_absolute_error,
"mse": mean_squared_error,
"r2": r2_score
}
metric_func = metric_func_lookup[metric_name]
return metric_func(y_true, y_pred)
@task
def get_trial_summary(trial_id:str, actuals, input_df: pd.DataFrame, metrics: dict) -> dict:
out = {"id": trial_id}
out["data"] = {
"num_obs": input_df.shape[0],
"metrics": metrics,
"target": {
"mean": actuals.mean(),
"median": actuals.median(),
"min": actuals.min(),
"max": actuals.max()
}
}
return out
@task(log_stdout=True)
def write_trial_summary(trial_summary: str):
"""
Write out a summary of the file. Currently just logs back to the
Prefect logger
"""
logger = prefect.context.get("logger")
logger.info(json.dumps(trial_summary))
Construct a Flow
Now that all of the task logic has been defined, the next step is to compose those tasks into a “flow”. From the Prefect docs:
A Flow is a container for Tasks. It represents an entire workflow or application by describing the dependencies between tasks.
Flows are DAGs, or “directed acyclic graphs.” This is a mathematical way of describing certain organizational principles:
- A graph is a data structure that uses “edges” to connect “nodes.” Prefect models each Flow as a graph in which Task dependencies are modeled by Edges.
- A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.
- An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you’ve seen before.
Because we want this job to run on a schedule, the code below provides one additional argument to Flow(), a special “schedule” object. In this case, the code below says “run this flow every minute”.
Note
prefect flows do not have to be run on a schedule. To test a single run, just omit schedule from the code block below.with Flow('ticket-model-evaluation', schedule) as flow:
batch_size = Parameter(
'batch-size',
default=1000
)
trial_id = get_trial_id()
# pull sample data
sample_ticket_df = get_ticket_data_batch(batch_size)
# compute target
actuals = get_target(sample_ticket_df)
# get prediction
preds = predict(sample_ticket_df)
# compute evaluation metrics
mae = evaluate_model(actuals, preds, "mae")
medae = evaluate_model(actuals, preds, "medae")
mse = evaluate_model(actuals, preds, "mse")
r2 = evaluate_model(actuals, preds, "r2")
# get trial summary in a string
trial_summary = get_trial_summary(
trial_id=trial_id,
input_df=sample_ticket_df,
actuals=actuals,
metrics={
"MAE": mae,
"MedAE": medae,
"MSE": mse,
"R2": r2
}
)
# store trial summary
trial_complete = write_trial_summary(trial_summary)
If you execute
flow.run()
The flow will execute locally. It’s a good idea to start locally, with a smaller subset of your data before scaling up.
Scaling up with Dask Clusters
LocalCluster
A Dask Local cluster is good for taking advantage of all the cores on your machine. If you only need 512 GB of ram and 64 cores, spinning up a large instance and using Dask local cluster is a great idea.
from prefect.engine.executors import DaskExecutor
flow.run(executor=DaskExecutor())
Multi-Node Cluster
from dask_saturn import SaturnCluster
from prefect.engine.executors import DaskExecutor
flow.run(executor=DaskExecutor(
cluster_class=SaturnCluster,
cluster_kwargs=dict(
scheduler_class="xlarge",
worker_class="16xlarge",
nprocs=16,
ntreads=4,
n_workers=5
autoclose=True
)))
This tells prefect to create a SaturnCluster
when it starts the flow, and stop it (autoclose
) when it’s complete
Note
You get oneSaturnCluster
for your Jupyter instance. If you already have one, the constructor parameters will be ignored, and your existing SaturnCluster
will be used.
Deploy a Prefect flow in Saturn
Notebooks are great for rapid prototyping and interactive work, but since the flow in needs to be run on a schedule, it would be better to run it in an isolated environment that can be started, stopped, and scaled.
Saturn supports persistent and scheduled deployments. Persistent deployments are always on. If I want to run this flow every minute, I would recommend baking the Schedule into the flow, and using a persistent deployment. If I want to run the flow every day, I would recommend removing the schedule from the flow, and using scheduled deployments instead (running it on a Cron schedule)