From no experience with data tooling to fully automated Prefect pipelines
is a modern Python package for building, running
data pipelines. In this article, I’ll share how I used Prefect for my data visualization project.
My background & the problem I faced
Since I have a background in data analytics, I’m used to performing data transformation with well-prepared data, typically a Kaggle dataset or clean CSV file.
But, to be honest, data analysis itself is often not enough for a real world application. One day, I had the idea of developing a side project that visualizes shared-bike information in my city. Luckily, I found an open source API on the city government website, which updates the number of bikes available every five minutes.
Just as I got excited to jump in, I realized a problem. When I send requests to the API endpoint, it only returns the current shared-bike data. Without a data pipeline that fetches and saves the data, I could not accumulate my historical data.
But I still felt lucky, because the problem seemed solvable if I could build my own data pipeline. I looked for solutions, and found Prefect really fits into my needs - if you're in a similar situation or just want to learn more about Prefect, consider following my journey in this tutorial! I’ll be covering my stack:
Prefect Core - open-sourced data workflow automation tool
Prefect Cloud - orchestration layer that helps you schedule, monitor and debug your data pipeline
Digital Ocean - an accessible cloud computing services to run your Python code
01 Turn data ETL tasks into a Flow with Prefect Core 🍃
Our project goal is to create a data pipeline that fetches data from an API endpoint. Once we extract the data, we want to transform it (for example, adding custom columns) and load/save it to our computer.
Prefect is a scalable tool that manages the state of this ETL process. And Prefect Core is an open-sourced package you can install and use for free. Simply
run pip install prefect and we get access to a powerful workflow orchestration engine.
How to use Prefect in Python in a nutshell
A sample code snippet shows how the work is being done. You can find this example by cloning this repository
, and don’t forget to install dependencies before you run the code.
To use Prefect Core in Python, we use a task decorator to make each function a Prefect Task. On top of that, we use Prefect Flow objects to contain and manage these tasks. Here's a diagram that explains what Prefect does at a high level.
With Prefect Flow and Prefect Tasks together, the state of the pipeline is managed.
On Python Decorators
*If you're not familiar with Python Decorators, here’s a browser analogy: If we imagine our browser is a function, then a decorator is like google extensions. The decorator adds more functionality to the function itself - in our case, managing the state and dependency of the flow. Just keep in mind that the @ syntax is for the Python decorator to take effect.
Now we have the flow in place, we can execute it with the Prefect CLI.
>> prefect run -p flow.py
This command finds the Flow in the file flow.py
and executes it. If there’s no error in the flow, we see a nice “Flow run succeeded!” message in our logs.
Let's drink a cup of water to chill, we have made significant progress so far 🚰
Next, we’re going to see how to connect this workflow to the cloud.
02 Taking the flow to the Cloud ☁️
Now we have a working Flow, we want to do two more things:
We want to schedule and automate it.
We want to observe the flow execution status, and get informed when things go wrong - slack notification or email is preferred.
These two terms together are the idea of Orchestration. We want to build a system for the long run that is robust and gives us confidence. You might think this is nice to have but too much to build yourself. You’re right, and the good news is you don’t have to. Prefect Cloud is a service for this.
Using Prefect Cloud to Orchestrate our Workflow 🌊
How to use Prefect Cloud (more information in Prefect’s Getting Started docs
First, we sign up for an account on Prefect Cloud.
Then, obtain an API key, which authorizes your computer to register the flow. (Visit account setting > create API key)
The process of registering a flow to Prefect Cloud
The Quick Run button allows you to run a flow run the web UI
One mistake I made is assuming a flow will be executable after registration. The step I missed is starting a Prefect Agent on my local computer. The Agent will query Prefect Cloud for scheduled flow runs. If there are any, the agent will fetch the flow metadata (the location of the script, for example) and kick off execution. Without an agent, the execution layer is missing and there’s nothing to execute a flow.. (more information in Prefect Agent docs)
How a local agent looks in my terminal - please never share your Prefect Cloud API key; my key was included for informational purposes and has been revoked so it is no longer usable
Agents are a critical part of Prefect’s hybrid model
, which allows workflow orchestration without sharing your code or data. Through its Agent design, Prefect achieved a way to manage flows on Prefect cloud without needing access to your data or code.
Monitor your flow (the other nice stuff)
Once the flow is registered on a cloud and has an available agent for execution, we can hit the quick run button to start a new flow run -
Or view the code schematic to understand its function dependency -
Or scheduling -
Or create an automation to trigger notification if the flow fails -
Back to the bike data fetching problem
So far, we’ve seen how Prefect Core and Prefect Cloud together could manage Python execution. In our case, building up a system that fetches the bike data on a five minute interval solves the data sourcing problem.
After running it for a month and more, I collected daily bike data and have confidence that my pipeline is being monitored, so I could get messages in case of any failure.
Result - data app demo
Using this collected data, I was able to build a data app that visualized the availability of the bikes. This felt like a huge step for me, as I previously only worked with ready-to-use data, never able to collect data from a source.
Some final technical notes on flow deployment
I deployed the code to DigitalOcean
, a self-served cloud computing service, as well as setting up an agent there. In this way, the code kept running even if I closed my computer.
I also used tmux
for running the agent, so I can safely ssh-out the virtual machine and keep the terminal session going. This is an essential trick because the agent communicates with Prefect Cloud.
Finally, the data app is built with Streamlit
- a package to turn your data app into a web app. Streamlit apps have great potential to extend into useful mobility apps (such as a basic Citymapper
web app), or to create interactive interfaces to inform insight about the bike demand.
Overall, the above stack allows a Python coder to collect data, build interesting apps, and manage it with ease.
From a developer with no background in data pipelines, I was able to pick up Prefect along with other modern tools to build my data pipeline. I found the learning experience empowering, and hope you also get inspired!
If you have any questions, feel free to DM on twitter