Orchestrating ELT with Prefect, dbt Cloud, and Snowflake (Part 3)
SHARE
Jan. 4, 2022

How to use Prefect and dbt Cloud with a Snowflake data warehouse

Anna Geller
Anna GellerLead Community Engineer

This is the third post in a series of articles about orchestrating ELT data pipelines with Prefect. The first post dealt with organizing a project and orchestrating flows in a local environment. The second one discussed deploying the ELT project to Snowflake, AWS EKS, and building a CI/CD process. In this post, we’ll look at additional use cases around dbt Cloud, adding notifications, as well as interacting with database connections.

There are several ways of configuring custom notifications on state changes, but we’ll look at two of them: automations and state handlers.

Automations can be configured directly from the UI. For instance, the animation below shows how to set up an email notification on all failed flows. This is especially useful if you prefer to set it up once upfront, rather than configuring it independently on each flow.

prefect cloud new automation

The Prefect Cloud UI showing the "New Automation" screen

By following the same steps, you can configure Teams or Slack alerts, or report failed flow runs as Pagerduty incidents — the Automation actions allow you to even send the notification to an arbitrary webhook, opening doors to all sorts of automated workflows.

Automation actions in prefect ui

the Prefect UI showing Automation actions

State handlers are based on the same principle as automations — they fire on specific state transitions. However, their underlying logic must be explicitly defined within your flow code.

For instance, the Gist below defines a state handler that sends a Slack message any time the flow run reaches a Failed state. You can see that a task-level state handler is a function that takes a task object, as well as the old and new task-run state as input and returns the new state as output. Before returning the new state to downstream tasks, it can perform some action, such as sending a Slack message including the reason why the task failed and when did it happen.

A resource manager allows you to reuse database connections and other resources across tasks. It acts as a context manager around your tasks. To configure it, you need to build a class with a setup and cleanup method. Under the hood, Prefect will create special tasks for those methods:

  • setup()

     

    should contain code that creates resources such as database connections, Spark sessions, etc.

  • clenup()

     

    is invoked when your tasks exit the context manager.

To declare this class to be a resource manager, you need to add a resource_manager decorator. Here is how a resource manager for Snowflake database connection can look:

Within your flow definition, you can use it as a context manager around tasks that rely on that resource. In the example below, the database connection is reused across tasks extracting data from this database (lines 43–44). You can define tasks that no longer need this resource outside of the context manager (lines 45–46).

💡 Can resource manager be used with mapping? If you want to parallelize database operations using mapping, you should open and close the database connection inside of a task instead of relying on a resource manager. Resource manager wouldn’t work well in this case since we cannot share a database connection across threads or processes that may even run on entirely different worker nodes. Mapped tasks must be completely self-contained so that each child task can be executed and retried independently of other tasks or resource constraints.

So far, the dbt workflow shown in the previous articles was assuming that you leverage the open-source version of dbt. However, if you’re already using dbt Cloud, you may prefer to trigger a dbt Cloud job instead. Let’s look at how you can approach that.

First, let’s build a new dbt project and create a data warehouse connection. Once you are logged into your dbt Cloud account, go to New Project:

new dbt project

dbt UI showing new project creation

You can then follow this three-step setup:

  1. Create a project,

  2. Set up a database connection,

  3. Add a Git repository.

set up a new project

The three-step project setup in dbt Cloud

First, set a name for the project:

set a name

dbt Cloud UI project setting screen; name field

Second, configure and test a database connection to your data warehouse — this includes entering your account name, your database and the name of your compute warehouse along with your Snowflake user name and password:

database connection

dbt Cloud UI showing "set up a database connection" screen, setting up a snowflake database

Third, choose a repository — we selected the one that we were using in the previous posts in this series:

set up a repository

dbt Cloud UI; set up a repository screen

Finally, we can start developing dbt models:

Prefect (partner) on dbt cloud

dbt Cloud UI: welcome to Prefect (Partner) on dbt Cloud!

To learn how to develop dbt models, check out dbt documentation. Let’s look at how to create a job and schedule it with Prefect.

Before we can create a job, we need an environment for it to run. To configure it, go to “New Environment”:

new environment

dbt Cloud: new environment

Then, fill out the required database details and credentials. Those details are needed, because you may have one job that runs in a development database or schema and another one in production.

new environment settings

dbt Cloud: new environment settings screen

Once we click on “Save”, we are directed to a screen from which we can create a new job.

Within the “Environments” page, create a “New Job”.

dev environment

dbt Cloud UI: within the "Environments page," create a "New Job"

You can see that the environment has been preselected. In the “Commands” section, configure all dbt CLI commands you want to run, and save the job.

new job

dbt Cloud UI: new job screen

As soon as you save the job, you land on the “Jobs” page for this specific environment. At this point, write down the account ID and the job ID from the URL on this page — we’ll need that later to trigger dbt Cloud jobs from a Prefect flow.

specific environment jobs page

dbt Cloud UI: screenshot showing where to find the account ID and the job ID in the URL of the Jobs page

Before you can run a Prefect flow that will trigger a dbt Cloud job, you need to generate an API key in your account settings:

generate API key

dbt Cloud UI: Generate an API key screenshot

Ideally, add this token directly in the Prefect Cloud UI as a secret:

create new secret

adding a token in the Prefect Cloud UI as a secret

Then you can use the token, as well as the account_id and job_id in the following flow.

When the flow run is finished, you can explore the links provided in the Artifacts tab:

dbtcloudrunjob

dbt Cloud UI: Artifacts screen

You can either download those or open the links directly in your browser. For example, when you visit the last link (run_results.json), you can see the output of the run, such as passed dbt tests:

output of results

output of a run (run_results.json)

If you install the JSON Viewer Chrome extension, your browser will render the downloaded JSON artifacts in a human-readable format.

This post covered how to reuse resources such as a Snowflake database connection across tasks in your ELT data pipeline. It discussed two ways of configuring custom notifications, either via automations in the UI or via state handlers in the flow code. Finally, it introduced the new DbtCloudRunJob task from the Task Library and demonstrated how it can be used in a flow.

If anything about what we’ve discussed in this post is unclear, feel free to ask a question in our community Slack.

We can’t wait to see what you build.

Thanks for reading, and happy engineering!

Posted on Jan 4, 2022
Blog Post
Snowflake
dbt
dbt Labs
dbt Cloud
Integrations
RELATED LINKS

Love your workflows again

Orchestrate your stack to gain confidence in your data