A modern ETL pattern using Prefect’s S3 Storage and Kubernetes runtime

A modern ETL pattern using Prefect’s S3 Storage and Kubernetes runtime

This post is a tutorial on patterns for Python ETLs leveraging Prefect workflow orchestration. Prefect makes your Python jobs easy to repeat, manage, scale, and understand.

Nate Nowack | Solutions Engineer

January 10, 2022

Since I’m fond of cats, the Great British Baking Show, and dragging out analogies…I’ll say “there are many ways to bake a cake” when it comes to creating modern ETL-like jobs moving data from point A to point B.
However, for data engineers, the challenge is rarely baking a single cake once—it’s in baking many types of cake on interdependent schedules, in a customizable manner that’s easy to repeatmanagescale, and understand.
Prefect is an orchestration engine that takes your family-favorite cake recipes and makes them easy to repeatmanagescale, and understand — erm… well, I mean, just as long as the cake recipes are actually Python jobs.
Below I’ll lay out a Prefect ETL pattern with 3 main considerations in mind:
  • rapid deploymentaddressing Docker storage’s drawbacks at scale, RE: building / pushing a Docker image to a registry whenever code changes
  • scalability—offered by containerized runtime environments like EKS
  • modularization—clear categorization of various Python jobs (flows) into Prefect Projects, based on intended outcomes and required dependencies
What do I need in order to use a pattern like this?
This specific example will require a few pieces of supporting infrastructure:
I’m using AWS for cloud, but this same pattern works for GCP / Azure.
Why are we using Prefect again?
We’ll use a few of Prefect’s convenient features (* → Prefect Cloud only):
How does all of this piece together? i.e code plz
Most directly, we can build our flows using Prefect’s Functional API. In less true but generally Pythonic terms, we throw some specially @task-decorated Python functions in a Flow context manager and let Prefect go to work:
Wait, when does it run?
Here’s what this Flow context looks like as a DAG.
By running the Python file as is, Prefect will use our Flow context definition (lines 47–50 from above) to build a DAG (Directed Acyclic Graph, i.e. execution plan). The DAG is inferred by Prefect based on how our task objects are called and how return values are passed between them.
Let’s break down these 4 lines, since there’s a fair amount happening here:
  • We defined a data sources Parameter in a JSON-like List[Dict]in terms of what requests.get() would need to extract each source.
  • We tell Prefect to dynamically map this extract() task across each source in data sources, producing raw_data: List[raw_piece_of_data]
  • We map our transform() task over the raw_data from extract() (which is inferred to be an upstream task, i.e. transform if extract succeeds)
  • The terminal task will load() our squeaky clean_data to our destination of choice—storing any relevant flow results in a bucket.
Now that our flow is built, we can use flow.run() outside the flow context to trigger flow runs locally:
if __name__ == "__main__": flow.run()
… or we can also use the CLI or the Prefect Cloud UI to trigger flow runs.
At this point, we can create a Prefect Cloud project and assign this flow to it:
prefect register -p flow.py --project DEMO
This registration will upload our flow as a script to the bucket using the {PROJECT}/flow.py key defined in the S3_storage object—no need to build and push a whole new Docker image to deploy iterative changes or fixes.
The flow will now be loaded from S3 and executed on the EKS cluster as prompted by our Kubernetes Agent, which can be based on a Schedule, manual triggers from the UI, or event-driven Client / GraphQL API calls.
To handle dependencies, I built a custom image on top of the Prefect Python 3.9 base image including hypothetically common Python libraries to this (demo) Prefect project. I pushed that project’s image to an AWS ECR repo, stored the image URL in Prefect KV store as {PROJECT}_BASE_IMAGE , and provided it to KubernetesRun() as the image to run on a pod at runtime.
A couple thoughts on scalability
These data sources could easily be defined elsewhere as a JSON-like object, loaded at runtime, and passed to flow runs as Parameters. A good example of ‘elsewhere’ would be the Prefect KV store, where you can store and update stateful values for use in Flow runs, like this example.
In order to further parallelize our mapped tasks in production, we could use Dask Kubernetes’ KubeCluster, as outlined in a great article by George Coyne.
This pattern was designed to be extensible across projects, teams and organizations. I chose to store my flows in sub-divisions of my S3 bucket according to Prefect Project , but flows can be stored in buckets in whatever logical grouping of S3 blobs is most convenient for the project at hand.
Any questions on patterns for python ETLs leveraging Prefect workflow orchestration? Don’t hesitate in reaching out to us in our Slack Community.
Happy engineering!