A modern ETL pattern using Prefect’s S3 Storage and Kubernetes runtime
SHARE
Jan. 10, 2022

This post is a tutorial on patterns for Python ETLs leveraging Prefect workflow orchestration. Prefect makes your Python jobs easy to repeat, manage, scale, and understand.

Nate Nowack
Nate NowackSolutions Engineer

Since I’m fond of cats, the Great British Baking Show, and dragging out analogies…I’ll say “there are many ways to bake a cake” when it comes to creating modern ETL-like jobs moving data from point A to point B.

However, for data engineers, the challenge is rarely baking a single cake once—it’s in baking many types of cake on interdependent schedules, in a customizable manner that’s easy to repeatmanagescale, and understand.

prefect-logo-full-solid-blue (1)

The Prefect logo

Prefect is an orchestration engine that takes your family-favorite cake recipes and makes them easy to repeatmanagescale, and understand — erm… well, I mean, just as long as the cake recipes are actually Python jobs.

Below I’ll lay out a Prefect ETL pattern with 3 main considerations in mind:

  • rapid deploymentaddressing Docker storage’s drawbacks at scale, RE: building / pushing a Docker image to a registry whenever code changes

  • scalability—offered by containerized runtime environments like EKS

  • modularization—clear categorization of various Python jobs (flows) into Prefect Projects, based on intended outcomes and required dependencies

This specific example will require a few pieces of supporting infrastructure:

I’m using AWS for cloud, but this same pattern works for GCP / Azure.

We’ll use a few of Prefect’s convenient features (* → Prefect Cloud only):

Most directly, we can build our flows using Prefect’s Functional API. In less true but generally Pythonic terms, we throw some specially @task-decorated Python functions in a Flow context manager and let Prefect go to work:

Here’s what this Flow context looks like as a DAG.

DAG illustration

DAG generated using flow.visualize

By running the Python file as is, Prefect will use our Flow context definition (lines 47–50 from above) to build a DAG (Directed Acyclic Graph, i.e. execution plan). The DAG is inferred by Prefect based on how our task objects are called and how return values are passed between them.

Let’s break down these 4 lines, since there’s a fair amount happening here:

  • We defined a data sources Parameter in a JSON-like List[Dict]in terms of what requests.get() would need to extract each source.

  • We tell Prefect to dynamically map this extract() task across each source in data sources, producing raw_data: List[raw_piece_of_data]

  • We map our transform() task over the raw_data from extract() (which is inferred to be an upstream task, i.e. transform if extract succeeds)

  • The terminal task will load() our squeaky clean_data to our destination of choice—storing any relevant flow results in a bucket.

Now that our flow is built, we can use flow.run() outside the flow context to trigger flow runs locally:

if __name__ == "__main__": flow.run()

… or we can also use the CLI or the Prefect Cloud UI to trigger flow runs.

At this point, we can create a Prefect Cloud project and assign this flow to it:

prefect register -p flow.py --project DEMO

This registration will upload our flow as a script to the bucket using the {PROJECT}/flow.py key defined in the S3_storage object—no need to build and push a whole new Docker image to deploy iterative changes or fixes.

The flow will now be loaded from S3 and executed on the EKS cluster as prompted by our Kubernetes Agent, which can be based on a Schedule, manual triggers from the UI, or event-driven Client / GraphQL API calls.

To handle dependencies, I built a custom image on top of the Prefect Python 3.9 base image including hypothetically common Python libraries to this (demo) Prefect project. I pushed that project’s image to an AWS ECR repo, stored the image URL in Prefect KV store as {PROJECT}_BASE_IMAGE , and provided it to KubernetesRun() as the image to run on a pod at runtime.

These data sources could easily be defined elsewhere as a JSON-like object, loaded at runtime, and passed to flow runs as Parameters. A good example of ‘elsewhere’ would be the Prefect KV store, where you can store and update stateful values for use in Flow runs, like this example.

In order to further parallelize our mapped tasks in production, we could use Dask Kubernetes’ KubeCluster, as outlined in a great article by George Coyne.

This pattern was designed to be extensible across projects, teams and organizations. I chose to store my flows in sub-divisions of my S3 bucket according to Prefect Project , but flows can be stored in buckets in whatever logical grouping of S3 blobs is most convenient for the project at hand.

Any questions on patterns for python ETLs leveraging Prefect workflow orchestration? Don’t hesitate in reaching out to us in our Slack Community.

Happy engineering!

Posted on Jan 10, 2022
Blog Post
Kubernetes
AWS
S3
Scheduling
DevOps & CI/CD

Love your workflows again

Orchestrate your stack to gain confidence in your data