Workflow orchestration platforms have historically allowed managing task dependencies within individual data pipelines. While this is a good start, what if you have dependencies between data pipelines?
Say you have some flows or directed acyclic graphs (DAGs) that ingest operational data from various sources into the staging area of your data warehouse. You then want to build some business logic downstream, but only after the previous pipelines are completed.
Many frameworks either try to avoid the issue or provide half-baked solutions, such as offering only a visual grouping of tasks without actually treating those as individual first-class objects. But the underlying problem is real, and it affects nearly every company doing analytics at scale.
Describing the problem
Imagine the following use case: you want to run data transformation jobs with the data build tool called dbt. Before you can do that, you need to ensure that all extract and load (EL) pipelines have finished. Otherwise, your dbt models that depend on the staging area would be built upon stale data. You can’t run transformations on data that hasn’t arrived yet.
While it’s possible to put all the EL jobs and the dbt run task into a single data pipeline, you wouldn’t want to do this for a few reasons.
First, the EL jobs and dbt transformations are usually written and maintained by different teams (data engineers and analytics engineers).
Second, you lose the possibility to run each of those jobs individually when an ad-hoc run is needed.
Third, you end up with one giant pipeline, which becomes difficult to maintain in the long run and causes conflicts during development.
The desired solution
The target outcome is to keep each of the data pipelines independent, but at the same time, allow them to be called from any parent pipeline as needed. Additionally, the desired implementation must also be able to detect when child flows finish their execution to ensure we don’t prematurely start any task of which upstream dependencies haven’t finished yet.
Let’s look at how this pattern can be implemented using Prefect.
Demo: orchestrating EL pipelines and dbt transformations with Prefect
This demo is based on the jaffle shop scenario
from dbt. To provide a realistic example, raw data is ingested into a staging area through an individual data pipeline, separate from dbt. Also, we’ll include a flow that gets triggered after dbt transformations are finished to reflect a real-life application that needs to do something with this data, such as feeding it into business reports.
Preparing the environment
To make this demo easy to replicate on your machine, all you need to do is to clone this repository
, then install the required dependencies (ideally in a new virtual environment
The above command installs the dependencies in the editable mode in case we want to make any changes — those changes will be applied directly without us having to re-install the package.
For reproducibility, this demo uses a Postgres database running in a Docker container. To follow along with this tutorial, make sure to start a Postgres container:
Since you will need the credentials to this database in the dbt flow, set those in the Prefect Cloud UI
If you prefer setting Secrets through code, you can use the following environment variables. However, make sure that those are also set in the terminal session from which you start your local agent:
You can optionally also set the corresponding database credentials (
hostname, port, username, password) in your dbt profiles configuration:
/Users/your_user_name/.dbt/profiles.yml. Here is what it looks like:
Extract and load flow
The extract and load flow should reflect a stage where possibly multiple tasks in a data pipeline are extracting raw data from a variety of systems and loading it into your data warehouse. If you leverage an EL tool such as Fivetran or Airbyte, this job could include various tasks that trigger sync jobs
for the respective connectors.
Let’s look at the demo flow:
In a real-world scenario, you’ll likely have some custom modules that you may want to reuse across several tasks and flows. That’s why line 1 imports a function from a
flow_utilities package. This function handles authentication and the load process to the data warehouse.
Note that this example uses
as a flow storage
mechanism (line 11). Storage is an abstraction that defines where Prefect can find your flow definition. Examples of storage could be a Git repository, a cloud storage object, a local file, or a docker image. This configuration is required because of the hybrid execution model that keeps your code and data private while still taking full advantage of a managed workflow orchestration service.
Apart from storage, you also need to configure your run configuration (line 31). This demo uses a local agent with a label
dev. Thus, the
LocalRun(labels=["dev"]) allows you to specify that this flow should be picked up for execution by a local agent with a label
Start this agent in your terminal using the following Prefect CLI command:
To create a project and register the flow, use the following Prefect CLI commands:
The registration command will output a flow ID. Use this ID to trigger the flow directly from the CLI:
💡 Benefits of Prefect: note that you were able to execute the load process for all three datasets in parallel, simply by using
.map() on line 34, and attaching a
LocalDaskExecutor to the flow (line 29).
If your previous flow ran successfully, you should now have three new tables in your Postgres database in the schema
If you can see those tables, it means that you can proceed with dbt transformations. Here is the flow you can use:
To understand the flow better, it makes sense to run
flow.visualize(). This command should generate the following computational graph:
Most of the configuration happens within the initialization of the
DbtShellTask. This task is where you configure the dbt project, profile, environment, and database connection details (lines 40-58).
In the flow, we:
Delete the dbt folder if it exists in the current repository (there is a chance it could still be there from a previous run that might have failed mid-run).
Pass the dbt repository URL and the branch name to make it flexible. Adding those parameters allows us to reuse the same flow for development and production by leveraging the branch name corresponding to each environment.
Clone the dbt repository into a DBT_PROJECT folder.
Retrieve the dbt database credentials in a secure manner.
Once all the preparation steps are done, execute the dbt commands: dbt run and dbt test.
Ensure that our dbt logs get forwarded to the UI, and delete the dbt repository once everything is finished.
Once you run this flow, you should see many new tables created by dbt transformations in your Postgres database. The log output will also confirm whether dbt data validation tests have passed.
💡 Why do we clone dbt repository at each run? In many organizations, dbt transformations are written by a different team than data pipelines are. Separating flows and dbt transformations into individual repositories allows for a smooth transition between teams so that the development of orchestration pipelines by data engineers doesn’t block analytics engineers and vice versa.
This flow reflects anything that you may want to do once all your data preparation is finished (i.e., all Extracts, Loads, and Transformations):
You may want to refresh your Tableau extracts to ensure your dashboards are based on the most recent data.
You can build and send custom reports.
You can feed the preprocessed data into downstream applications.
You can even start some reverse ETL jobs.
Let’s look at the final child flow:
This flow is mostly a placeholder: you can use it as a template for your specific use case. There are two dashboards (customers and sales) for this demo that need to be updated based on the most recently loaded and transformed data from the previous steps.
A flow of flows: where the real orchestration happens
At this point, you have an extract-load process that orchestrates raw data extracts into a staging area. You’ve also built a flow that triggers dbt transformations. And finally, there is a final flow that consumes transformed data and feeds it into dashboards and downstream applications.
It’s time to put it all together into a parent flow that orchestrates all steps and ensures that if something goes wrong, you’ll be able to quickly and reliably answer the following questions:
Here is the final flow:
The flow does the following:
Creating a flow run for the child flow using the create_flow_run task.
Waiting until this child flow finishes its execution using the wait_for_flow_run task.
Setting dependencies to ensure that tasks run in the correct order, incl. failing the entire parent flow if any child-flow fails thanks to the raise_final_state=True flag. Note that this flag was introduced in the Prefect version 0.15.9.
The creation of a FlowRun (
create_flow_run) and waiting for its completion (
wait_for_flow_run) are separated into two individual tasks to ensure the separation of concerns. However, if you consider that too verbose, Prefect also has a task
StartFlowRun that provides a bit simpler (yet slightly less powerful) abstraction. Here is how you could use
StartFlowRun for the same use case:
Notice that it only requires one line per child flow since most of the configuration is performed at the task’s initialization (line 13).
Extending the flow of flows to new use cases
But what if you want to trigger multiple flow runs in parallel? While this is not required for this demo, you may encounter a use case when, for instance, the staging area flow may be itself a flow-of-flows. Why? Since the staging area is usually a starting point, those processes have no upstream dependencies, so they are ideal candidates for parallel execution.
Here is how this scenario could be translated into Prefect:
Note that you have to register all child flows before you use them in a parent flow.
This post discussed how to manage dependencies between data pipelines. Defining all dependencies in a single giant workflow is no longer feasible when building analytics at scale, and you saw how to solve this problem in Prefect: a parent flow can govern dependencies not only between tasks but also between independent flows. You can even execute multiple flows in parallel and offload their execution to a distributed Dask cluster.
There are many ways how you could extend this scenario, and we’ll do that in future articles:
Modify the code to be executed against a cloud data warehouse such as Snowflake rather than a local Postgres database.
Package the entire project into a Docker image so that all those flows can run on a Kubernetes cluster on AWS.
Build a CI/CD pipeline to ensure a repeatable deployment process.
Change the run configuration or parameter values of the child flows in a flow-of-flows.
Build a state handler to ensure you'll get notified when a parent pipeline finishes execution.
If anything about what we’ve discussed in this post is unclear, feel free to ask your questions in our community Slack
We can’t wait to see what you build. Thanks for reading, and happy engineering!