Your Python job is already a Prefect flow, it just doesn’t know it yet
SHARE
May 26, 2022

Turning your Python code into a scheduled, monitored, running in parallel pipeline has never been easier.

Nate Nowack
Nate NowackSolutions Engineer

Prefect 2.0 has graduated to beta status and it’s time to start kicking the tires!

The new iteration of the Prefect workflow orchestrator sets fire to clunky concepts historically central to data pipeline creation (notably DAGs) and allows us to write regular Python code as workflows without forcing us to refactor it just to play nice with more, uhh, idiosyncratic frameworks.

A unique benefit of this hands-off philosophy is the control to adopt as many (or few) Prefect features as we actually want in our workflows, for example:

You want caching but don’t need distributed compute? cool!

Vice versa? also cool

Both?

Troy Abed Community Gif

RIP Troy and Abed In the Morning

To put the rubber to the road, I’ll dive into a chess.com-themed flow, highlighting the naturally Pythonic way we can build workflows with Prefect.

I have a Python script that takes in a list of Chess.com usernames and writes those users’ games to S3 as Pandas DataFrames in parquet (row 1:1 with game).

Portable Game Notation (PGN) is the human-readable format in which chess games are represented as text.

You’ll need just two things to complete this example:

  • A Python environment (I used a conda venv) like the following:

pip install chess.com boto3 pandas python-chess prefect==2.0b2

  • A file path to write the games to! (I used an S3 Bucket.)

So, okay — we have some Python code that will load a user’s chess games into S3 for later analysis… what now?

While this is just a fun use case, it can also be an illustrative example of how we can incrementally adopt Prefect features to tackle inefficiencies in our workflows. Take the following considerations for example:

  • Every time I run the code, it’s going to pull get_games. We can avoid duplicating this work automatically, with caching!

  • In our orchestrator function (that I named orca 😅), we call the get_games function to fetch each month’s games in a list comprehension. Using pure Python in this way, that means all our API calls happen sequentially. To make our code faster, we can parallelize just by decorating our get_games function with @task to turn it into a Prefect Task and adding a DaskTaskRunner to our flow to parallelize the tasks using Dask Distributed.

Let’s see how we can make these changes!

First thing first — in order to reap the benefits of Prefect features, let’s first decorate our existing Python functions with @task and our orca function (i.e. driver, main, and so on) with @flow , which will give us:

… and presto change-o, we have ourselves a Prefect Flow! If you run the code now, you’ll see that we have some fancy logging action going on:

So we’re grabbing games from the chess.com API and packing them into Pandas DataFrames, right? This is a good first step, but instead of writing and testing extra Python logic to check which games we’ve already fetched and loaded, let’s use caching to save our function results (called tasks once wrapped with the @task decorator) so our workflow just remembers what we’ve done. That would look like:

What’s going on with this cache_key_fn=task_input_hash in the decorator?

Basically, task_input_hash will create a unique string (a hash based on the content of the arguments passed to the get_games task (just the url in this case). Prefect then saves the results of this get_games task and associate the hash with the results, so that if we ever see that hash again in future invocations of the task, Prefect will know that we already did that work.

To put it succinctly: if we already saw the input, we already have the output!

Having added caching functionality, let’s run it and look at the logs again:

Notice anything different?

Remember how we logged the Game URL in get_games? There’s no such output anymore since the Prefect engine realized it’s seen that url before, so it didn’t have to run that code. It just pulled the task result associated with that url from our cache, and left the get_games tasks in Cached state.

Prefect Storage can be configured to cache your task results. Try the CLI command prefect storage create --help.

So far, we’ve accomplished an efficiently lazy workflow — let’s see if we can make it faster and more distributed by exploring the Prefect DaskTaskRunner.

Although Prefect will execute non-blocking code concurrently by default, we can enable true parallelism with dask by passing the DaskTaskRunner to the flow decorator’s task_runner argument like:

… and you’ll notice that both the get_games and load_games tasks are now executed in parallel by Dask (even though we’re just using list comprehension and for loops)!

I hope this conveyed how easy it is to start leveraging Prefect’s newest offering — elevating your Python jobs into robust and distributed workflows with simple annotations to your code, as you’d prefer to write it!

Happy Engineering!

Posted on May 26, 2022
Blog Post
AWS
S3
Scheduling
Integrations

Love your workflows again

Orchestrate your stack to gain confidence in your data