This is the third post in a series of articles about orchestrating ELT data pipelines with Prefect. The first post dealt with organizing a project and orchestrating flows in a local environment. The second one discussed deploying the ELT project to Snowflake, AWS EKS, and building a CI/CD process. In this post, we’ll look at additional use cases around dbt Cloud, adding notifications, as well as interacting with database connections.
Configuring notifications in Prefect
There are several ways of configuring custom notifications on state changes, but we’ll look at two of them: automations and state handlers.
Automations can be configured directly from the UI. For instance, the animation below shows how to set up an email notification on all failed flows. This is especially useful if you prefer to set it up once upfront, rather than configuring it independently on each flow.
By following the same steps, you can configure Teams or Slack alerts, or report failed flow runs as Pagerduty incidents — the Automation actions allow you to even send the notification to an arbitrary webhook, opening doors to all sorts of automated workflows.
State handlers are based on the same principle as automations — they fire on specific state transitions. However, their underlying logic must be explicitly defined within your flow code.
For instance, the Gist below defines a state handler that sends a Slack message any time the flow run reaches a
Failed state. You can see that a task-level state handler is a function that takes a task object, as well as the old and new task-run state as input and returns the new state as output. Before returning the new state to downstream tasks, it can perform some action, such as sending a Slack message including the reason why the task failed and when did it happen.
A resource manager allows you to reuse database connections and other resources across tasks. It acts as a context manager around your tasks. To configure it, you need to build a class with a setup and cleanup method. Under the hood, Prefect will create special tasks for those methods:
should contain code that creates resources such as database connections, Spark sessions, etc.
is invoked when your tasks exit the context manager.
To declare this class to be a resource manager, you need to add a
resource_manager decorator. Here is how a resource manager for Snowflake database connection can look:
Within your flow definition, you can use it as a context manager around tasks that rely on that resource. In the example below, the database connection is reused across tasks extracting data from this database (lines 43–44). You can define tasks that no longer need this resource outside of the context manager (lines 45–46).
💡 Can resource manager be used with mapping? If you want to parallelize database operations using mapping, you should open and close the database connection inside of a task instead of relying on a resource manager. Resource manager wouldn’t work well in this case since we cannot share a database connection across threads or processes that may even run on entirely different worker nodes. Mapped tasks must be completely self-contained so that each child task can be executed and retried independently of other tasks or resource constraints.
So far, the dbt workflow shown in the previous articles was assuming that you leverage the open-source version of dbt. However, if you’re already using dbt Cloud, you may prefer to trigger a dbt Cloud job instead. Let’s look at how you can approach that.
1. Create a new dbt project
First, let’s build a new dbt project and create a data warehouse connection. Once you are logged into your dbt Cloud account, go to New Project:
You can then follow this three-step setup:
Create a project,
Set up a database connection,
Add a Git repository.
First, set a name for the project:
Second, configure and test a database connection to your data warehouse — this includes entering your account name, your database and the name of your compute warehouse along with your Snowflake user name and password:
Third, choose a repository — we selected the one that we were using in the previous posts in this series:
Finally, we can start developing dbt models:
To learn how to develop dbt models, check out dbt documentation. Let’s look at how to create a job and schedule it with Prefect.
2. Define a dbt Cloud environment
Before we can create a job, we need an environment for it to run. To configure it, go to “New Environment”:
Then, fill out the required database details and credentials. Those details are needed, because you may have one job that runs in a development database or schema and another one in production.
Once we click on “Save”, we are directed to a screen from which we can create a new job.
3. Create a dbt Cloud job
Within the “Environments” page, create a “New Job”.
You can see that the environment has been preselected. In the “Commands” section, configure all dbt CLI commands you want to run, and save the job.
As soon as you save the job, you land on the “Jobs” page for this specific environment. At this point, write down the account ID and the job ID from the URL on this page — we’ll need that later to trigger dbt Cloud jobs from a Prefect flow.
4. Generate an API key
Before you can run a Prefect flow that will trigger a dbt Cloud job, you need to generate an API key in your account settings:
Ideally, add this token directly in the Prefect Cloud UI as a secret:
Then you can use the token, as well as the
job_id in the following flow.
When the flow run is finished, you can explore the links provided in the Artifacts tab:
You can either download those or open the links directly in your browser. For example, when you visit the last link (
run_results.json), you can see the output of the run, such as passed dbt tests:
If you install the JSON Viewer Chrome extension, your browser will render the downloaded JSON artifacts in a human-readable format.
This post covered how to reuse resources such as a Snowflake database connection across tasks in your ELT data pipeline. It discussed two ways of configuring custom notifications, either via automations in the UI or via state handlers in the flow code. Finally, it introduced the new
DbtCloudRunJob task from the Task Library and demonstrated how it can be used in a flow.
If anything about what we’ve discussed in this post is unclear, feel free to ask a question in our community Slack.
We can’t wait to see what you build.
Thanks for reading, and happy engineering!