
Jan. 4, 2022
Orchestrating ELT with Prefect, dbt Cloud, and Snowflake (Part 3)
How to use Prefect and dbt Cloud with a Snowflake data warehouse
This is the third post in a series of articles about orchestrating ELT data pipelines with Prefect. The first post dealt with organizing a project and orchestrating flows in a local environment. The second one discussed deploying the ELT project to Snowflake, AWS EKS, and building a CI/CD process. In this post, we’ll look at additional use cases around dbt Cloud, adding notifications, as well as interacting with database connections.
Configuring notifications in Prefect
There are several ways of configuring custom notifications on state changes, but we’ll look at two of them: automations and state handlers.
Automations
Automations can be configured directly from the UI. For instance, the animation below shows how to set up an email notification on all failed flows. This is especially useful if you prefer to set it up once upfront, rather than configuring it independently on each flow.

The Prefect Cloud UI showing the "New Automation" screen
By following the same steps, you can configure Teams or Slack alerts, or report failed flow runs as Pagerduty incidents — the Automation actions allow you to even send the notification to an arbitrary webhook, opening doors to all sorts of automated workflows.

the Prefect UI showing Automation actions
State handlers
State handlers are based on the same principle as automations — they fire on specific state transitions. However, their underlying logic must be explicitly defined within your flow code.
For instance, the Gist below defines a state handler that sends a Slack message any time the flow run reaches a Failed
state. You can see that a task-level state handler is a function that takes a task object, as well as the old and new task-run state as input and returns the new state as output. Before returning the new state to downstream tasks, it can perform some action, such as sending a Slack message including the reason why the task failed and when did it happen.
Resource manager
A resource manager allows you to reuse database connections and other resources across tasks. It acts as a context manager around your tasks. To configure it, you need to build a class with a setup and cleanup method. Under the hood, Prefect will create special tasks for those methods:
setup()
should contain code that creates resources such as database connections, Spark sessions, etc.
clenup()
is invoked when your tasks exit the context manager.
To declare this class to be a resource manager, you need to add a resource_manager
decorator. Here is how a resource manager for Snowflake database connection can look:
Within your flow definition, you can use it as a context manager around tasks that rely on that resource. In the example below, the database connection is reused across tasks extracting data from this database (lines 43–44). You can define tasks that no longer need this resource outside of the context manager (lines 45–46).
💡 Can resource manager be used with mapping? If you want to parallelize database operations using mapping, you should open and close the database connection inside of a task instead of relying on a resource manager. Resource manager wouldn’t work well in this case since we cannot share a database connection across threads or processes that may even run on entirely different worker nodes. Mapped tasks must be completely self-contained so that each child task can be executed and retried independently of other tasks or resource constraints.
dbt Cloud
So far, the dbt workflow shown in the previous articles was assuming that you leverage the open-source version of dbt. However, if you’re already using dbt Cloud, you may prefer to trigger a dbt Cloud job instead. Let’s look at how you can approach that.
1. Create a new dbt project
First, let’s build a new dbt project and create a data warehouse connection. Once you are logged into your dbt Cloud account, go to New Project:

dbt UI showing new project creation
You can then follow this three-step setup:
Create a project,
Set up a database connection,
Add a Git repository.

The three-step project setup in dbt Cloud
First, set a name for the project:

dbt Cloud UI project setting screen; name field
Second, configure and test a database connection to your data warehouse — this includes entering your account name, your database and the name of your compute warehouse along with your Snowflake user name and password:

dbt Cloud UI showing "set up a database connection" screen, setting up a snowflake database
Third, choose a repository — we selected the one that we were using in the previous posts in this series:

dbt Cloud UI; set up a repository screen
Finally, we can start developing dbt models:

dbt Cloud UI: welcome to Prefect (Partner) on dbt Cloud!
To learn how to develop dbt models, check out dbt documentation. Let’s look at how to create a job and schedule it with Prefect.
2. Define a dbt Cloud environment
Before we can create a job, we need an environment for it to run. To configure it, go to “New Environment”:

dbt Cloud: new environment
Then, fill out the required database details and credentials. Those details are needed, because you may have one job that runs in a development database or schema and another one in production.

dbt Cloud: new environment settings screen
Once we click on “Save”, we are directed to a screen from which we can create a new job.
3. Create a dbt Cloud job
Within the “Environments” page, create a “New Job”.

dbt Cloud UI: within the "Environments page," create a "New Job"
You can see that the environment has been preselected. In the “Commands” section, configure all dbt CLI commands you want to run, and save the job.

dbt Cloud UI: new job screen
As soon as you save the job, you land on the “Jobs” page for this specific environment. At this point, write down the account ID and the job ID from the URL on this page — we’ll need that later to trigger dbt Cloud jobs from a Prefect flow.

dbt Cloud UI: screenshot showing where to find the account ID and the job ID in the URL of the Jobs page
4. Generate an API key
Before you can run a Prefect flow that will trigger a dbt Cloud job, you need to generate an API key in your account settings:

dbt Cloud UI: Generate an API key screenshot
Ideally, add this token directly in the Prefect Cloud UI as a secret:

adding a token in the Prefect Cloud UI as a secret
Then you can use the token, as well as the account_id
and job_id
in the following flow.
When the flow run is finished, you can explore the links provided in the Artifacts tab:

dbt Cloud UI: Artifacts screen
You can either download those or open the links directly in your browser. For example, when you visit the last link (run_results.json
), you can see the output of the run, such as passed dbt tests:

output of a run (run_results.json)
If you install the JSON Viewer Chrome extension, your browser will render the downloaded JSON artifacts in a human-readable format.
Next Steps
This post covered how to reuse resources such as a Snowflake database connection across tasks in your ELT data pipeline. It discussed two ways of configuring custom notifications, either via automations in the UI or via state handlers in the flow code. Finally, it introduced the new DbtCloudRunJob
task from the Task Library and demonstrated how it can be used in a flow.
If anything about what we’ve discussed in this post is unclear, feel free to ask a question in our community Slack.
We can’t wait to see what you build.
Thanks for reading, and happy engineering!