Prefect 2.0 has graduated to beta status and it’s time to start kicking the tires!
The new iteration of the Prefect workflow orchestrator sets fire to clunky concepts historically central to data pipeline creation (notably DAGs) and allows us to write regular Python code as workflows without forcing us to refactor it just to play nice with more, uhh, idiosyncratic frameworks.
A unique benefit of this hands-off philosophy is the control to adopt as many (or few) Prefect features as we actually want in our workflows, for example:
You want caching but don’t need distributed compute? cool!
Vice versa? also cool
Example use case: you like chess?
To put the rubber to the road, I’ll dive into a chess.com-themed flow, highlighting the naturally Pythonic way we can build workflows with Prefect.
what’s my example?
💡 Did you know?
Portable Game Notation (PGN) is the human-readable format in which chess games are represented as text.
What do I need to follow along and run the code?
You’ll need just two things to complete this example:
A Python environment (I used a conda venv) like the following:
pip install chess.com boto3 pandas python-chess prefect==2.0b2
A file path to write the games to! (I used an S3 Bucket.)
The script, without using any Prefect features
So, okay — we have some Python code that will load a user’s chess games into S3 for later analysis… what now?
While this is just a fun use case, it can also be an illustrative example of how we can incrementally adopt Prefect features to tackle inefficiencies in our workflows. Take the following considerations for example:
Every time I run the code, it’s going to pull
get_games. We can avoid duplicating this work automatically, with caching!
In our orchestrator function (that I named
orca😅), we call the
get_gamesfunction to fetch each month’s games in a list comprehension. Using pure Python in this way, that means all our API calls happen sequentially. To make our code faster, we can parallelize just by decorating our
@taskto turn it into a Prefect
Taskand adding a
DaskTaskRunnerto our flow to parallelize the tasks using Dask Distributed.
Let’s see how we can make these changes!
The glow up: Python script → Prefect Flow
First thing first — in order to reap the benefits of Prefect features, let’s first decorate our existing Python functions with
@task and our
orca function (i.e. driver, main, and so on) with
@flow , which will give us:
… and presto change-o, we have ourselves a Prefect Flow! If you run the code now, you’ll see that we have some fancy logging action going on:
So we’re grabbing games from the chess.com API and packing them into Pandas DataFrames, right? This is a good first step, but instead of writing and testing extra Python logic to check which games we’ve already fetched and loaded, let’s use caching to save our function results (called tasks once wrapped with the
@task decorator) so our workflow just remembers what we’ve done. That would look like:
What’s going on with this
cache_key_fn=task_input_hash in the decorator?
task_input_hash will create a unique string (a hash based on the content of the arguments passed to the
get_games task (just the
url in this case). Prefect then saves the results of this
get_games task and associate the hash with the results, so that if we ever see that hash again in future invocations of the task, Prefect will know that we already did that work.
To put it succinctly: if we already saw the input, we already have the output!
Having added caching functionality, let’s run it and look at the logs again:
Notice anything different?
Remember how we logged the
Game URL in
get_games? There’s no such output anymore since the Prefect engine realized it’s seen that
url before, so it didn’t have to run that code. It just pulled the task result associated with that
url from our cache, and left the
get_games tasks in
💡 Did you know?
Prefect Storage can be configured to cache your task results. Try the CLI command
prefect storage create --help.
So far, we’ve accomplished an efficiently lazy workflow — let’s see if we can make it faster and more distributed by exploring the Prefect
Parallelism — avoid slow sequential execution
Although Prefect will execute non-blocking code concurrently by default, we can enable true parallelism with
dask by passing the
DaskTaskRunner to the
task_runner argument like:
… and you’ll notice that both the
load_games tasks are now executed in parallel by Dask (even though we’re just using list comprehension and for loops)!
… and that’s a wrap!
I hope this conveyed how easy it is to start leveraging Prefect’s newest offering — elevating your Python jobs into robust and distributed workflows with simple annotations to your code, as you’d prefer to write it!