Prefect: Zero to Hero
SHARE
Aug. 27, 2021

From no experience with data tooling to fully automated Prefect pipelines

Yueh Han Huang
Yueh Han HuangSoftware Engineering Intern

Prefect is a modern Python package for building, running and monitoring data pipelines. In this article, I’ll share how I used Prefect for my data visualization project.

Since I have a background in data analytics, I’m used to performing data transformation with well-prepared data, typically a Kaggle dataset or clean CSV file.

But, to be honest, data analysis itself is often not enough for a real world application. One day, I had the idea of developing a side project that visualizes shared-bike information in my city. Luckily, I found an open source API on the city government website, which updates the number of bikes available every five minutes.

Ubike data

Just as I got excited to jump in, I realized a problem. When I send requests to the API endpoint, it only returns the current shared-bike data. Without a data pipeline that fetches and saves the data, I could not accumulate my historical data.

But I still felt lucky, because the problem seemed solvable if I could build my own data pipeline. I looked for solutions, and found Prefect really fits into my needs - if you're in a similar situation or just want to learn more about Prefect, consider following my journey in this tutorial! I’ll be covering my stack:

  • Prefect 1.0 (aka Core) - open-sourced data workflow automation tool

  • Prefect Cloud - orchestration layer that helps you schedule, monitor and debug your data pipeline

  • Digital Ocean - an accessible cloud computing services to run your Python code

Our project goal is to create a data pipeline that fetches data from an API endpoint. Once we extract the data, we want to transform it (for example, adding custom columns) and load/save it to our computer.

etl diagram

Prefect is a scalable tool that manages the state of this ETL process. And Prefect 1.0 is an open-sourced package you can install and use for free. Simply run pip install prefect and we get access to a powerful workflow orchestration engine.

A sample code snippet shows how the work is being done. You can find this example by cloning this repository, and don’t forget to install dependencies before you run the code.

flows with vs without prefect

To use Prefect 1.0 in Python, we use a task decorator to make each function a Prefect Task. On top of that, we use Prefect Flow objects to contain and manage these tasks. Here's a diagram that explains what Prefect does at a high level.

etl with prefect diagram

With Prefect Flow and Prefect Tasks together, the state of the pipeline is managed.

the value of Prefect

*If you're not familiar with Python Decorators, here’s a browser analogy: If we imagine our browser is a function, then a decorator is like google extensions. The decorator adds more functionality to the function itself - in our case, managing the state and dependency of the flow. Just keep in mind that the @ syntax is for the Python decorator to take effect.

Now we have the flow in place, we can execute it with the Prefect CLI.

>> prefect run -p flow.py

This command finds the Flow in the file flow.py and executes it. If there’s no error in the flow, we see a nice “Flow run succeeded!” message in our logs. 

flow run succeeded bike flow

Let's drink a cup of water to chill, we have made significant progress so far 🚰

Next, we’re going to see how to connect this workflow to the cloud. 

Now we have a working Flow, we want to do two more things:

  • We want to schedule and automate it.

  • We want to observe the flow execution status, and get informed when things go wrong - slack notification or email is preferred. 

These two terms together are the idea of Orchestration. We want to build a system for the long run that is robust and gives us confidence. You might think this is nice to have but too much to build yourself. You’re right, and the good news is you don’t have to. Prefect Cloud is a service for this.

bike flow run

undefined

  • First, we sign up for an account on Prefect Cloud.

  • Then, obtain an API key, which authorizes your computer to register the flow. (Visit account setting > create API key)

getting started

The process of registering a flow to Prefect Cloud

quickrun

The Quick Run button allows you to run a flow run the web UI

One mistake I made is assuming a flow will be executable after registration. The step I missed is starting a Prefect Agent on my local computer. The Agent will query Prefect Cloud for scheduled flow runs. If there are any, the agent will fetch the flow metadata (the location of the script, for example) and kick off execution. Without an agent, the execution layer is missing and there’s nothing to execute a flow.. (more information in Prefect Agent docs)

prefect-agent

How a local agent looks in my terminal - please never share your Prefect Cloud API key; my key was included for informational purposes and has been revoked so it is no longer usable

Agents are a critical part of Prefect’s hybrid model, which allows workflow orchestration without sharing your code or data. Through its Agent design, Prefect achieved a way to manage flows on Prefect cloud without needing access to your data or code.

Once the flow is registered on a cloud and has an available agent for execution, we can hit the quick run button to start a new flow run -

quickrun

Or view the code schematic to understand its function dependency -

flow-run-schematic

Or scheduling -

flow-scheduling

Or create an automation to trigger notification if the flow fails -

automations

So far, we’ve seen how Prefect 1.0 and Prefect Cloud together could manage Python execution. In our case, building up a system that fetches the bike data on a five minute interval solves the data sourcing problem. 

After running it for a month and more, I collected daily bike data and have confidence that my pipeline is being monitored, so I could get messages in case of any failure. 

csv list

Using this collected data, I was able to build a data app that visualized the availability of the bikes. This felt like a huge step for me, as I previously only worked with ready-to-use data, never able to collect data from a source.

bike streamlit app

I deployed the code to DigitalOcean, a self-served cloud computing service, as well as setting up an agent there. In this way, the code kept running even if I closed my computer. 

I also used tmux for running the agent, so I can safely ssh-out the virtual machine and keep the terminal session going. This is an essential trick because the agent communicates with Prefect Cloud. 

Finally, the data app is built with Streamlit - a package to turn your data app into a web app. Streamlit apps have great potential to extend into useful mobility apps (such as a basic Citymapper web app), or to create interactive interfaces to inform insight about the bike demand. 

Overall, the above stack allows a Python coder to collect data, build interesting apps, and manage it with ease. 

yh-stack

From a developer with no background in data pipelines, I was able to pick up Prefect along with other modern tools to build my data pipeline. I found the learning experience empowering, and hope you also get inspired! 

If you have any questions, feel free to DM on twitter!

Posted on Aug 27, 2021
Blog Post
Digital Ocean
tmux
streamlit
Caching
Dynamic DAGs
Error Handling
Logging
Mapping
Retries
Scheduling
Monitoring

Love your workflows again

Orchestrate your stack to gain confidence in your data