How to Use Prefect and Monte Carlo to Achieve More Reliable Data Pipelines

How to Use Prefect and Monte Carlo to Achieve More Reliable Data Pipelines

As recently announced, Prefect has a brand-new integration with Monte Carlo—a leading platform that adds observability features across your data environments, from ingestion in the data warehouse or lake to your downstream dashboards and reports. This hands-on post will dive into what Monte Carlo is and how to use it to add even more observability to your Prefect flows. You’ll learn the similarities and differences between the tools and why using both in tandem can be beneficial.

Anna Geller | Lead Community Engineer

February 1, 2022

What is Monte Carlo?
Monte Carlo is an end-to-end, fully automated data observability platform that can assess how your data warehouse operates, while also giving users the ability to set custom rules and thresholds to meet the needs of their data SLAs.
By continuously processing the outputs of your data warehouse (such as query logs, table schemas, execution metrics, etc.), Monte Carlo learns about your data access patterns and can, therefore:
  • Generate a data lineage graph (by parsing CRUD-operation queries), so that you can understand where data came from to increase confidence and identify root causes when problems occur,
  • Inform you about which tables are used frequently and which ones are never queried,
  • Track schema changes, data freshness, completeness, volume, and more to verify that data is accurate.
Monte Carlo sends alerts about anomalies and other unknown unknowns in your data based on custom rules and machine learning. With all those features, the platform allows for data observability from ingestion in the warehouse or lake to your analytics layer, centralized incident management, and quick resolution of data downtime issues.
How is Monte Carlo different from Prefect?
Both tools help improve data and data flow visibility. But whereas Prefect orchestrates data workflows and provides observability into their execution states, Monte Carlo analyzes the data itself—including lineage, schema changes, and data freshness, among other qualities.
Likewise, both frameworks help to monitor state changes, like a failure or unexplained updates. For instance, you can use Monte Carlo to find out why a particular report is broken and which downstream processes are affected. Based on the information that Monte Carlo collects, you may find out that:
  • someone changed a column name or a data type of a column,
  • or that the underlying data has not been updated for a week.
Monte Carlo adds observability to the results of your data pipelines. In contrast, Prefect can tell you why your data may not have arrived in a data warehouse because it monitors the entire lifecycle of your data pipeline's execution, not just their end result. That’s why using both platforms can be so beneficial.
The problem that Prefect’s integration for Monte Carlo can solve
Monte Carlo is able to track data lineage based on data warehouse query logs. It knows:
  • when the tables in the staging area have been created,
  • what is their schema and how this schema evolved (new columns added, the data type of a specific column changed, etc.),
  • when new rows are added or existing rows are updated,
  • when users query a particular table and who are those users.
What Monte Carlo isn’t able to automatically understand is exactly how data gets to the data warehouse or data lake in the first place—those data pipelines or flows that actually load data from source systems into the warehouse or lake. Those could be custom Extract & Load jobs, data ingestion sync jobs triggered from Fivetran or Airbyte, or this data may be generated entirely based on data science experiments. All those use cases can be orchestrated using Prefect, and this integration allows Monte Carlo users to understand exactly which job, pipeline, and source system the data came from, effectively creating those “missing” upstream nodes in Monte Carlo’s lineage.
Monte Carlo automatically infers consumption by BI assets (such as Looker dashboards), but it isn’t able to automatically understand how data is used in custom applications, such as ML models, APIs, or other data products. Thus, the integration with Prefect helps customers build a complete picture of data consumption.
This additional visibility into these relationships between critical data assets—and the data life cycle more broadly—cuts down on the time to detection and resolution for data downtime, enabling:
  • More efficient root cause analysis for data issues (Where did this data come from? How was it generated?),
  • Better change management processes that prevent data issues from occurring (What will I break if I push this schema change? Who do I need to notify of this change?),
  • True “self-service” data discovery workflows (Where is this data used? Can I trust it?).
The image below shows a concrete example of how the integration between Prefect and Monte Carlo can enhance the data lineage and provide visibility into:
  • how data is used and what downstream applications rely on that data,
  • which tables have been used to train specific ML models or build BI reports, and which downstream systems rely on data from a given table.
Prefect tasks for Monte Carlo
Here are use cases enabled by the new integration:
  1. Adding upstream nodes: add origin source systems (the nodes on the left in the image) to track lineage for raw data ingested into staging area of a data warehouse
  2. Adding downstream nodes: add nodes to track which processes are affected if specific data in a data warehouse is wrong or missing.
  3. Enrich existing tables with metadata from Prefect flows, e.g.:
    • add a tag based on Prefect context indicating the last time this data has been used or ingested in a specific Prefect flow,
    • add a tag to a specific table indicating whether it has passed data quality tests running on Prefect (e.g., using the integration),
    • add a tag saying that data from table X is currently used in a specific data science experiment or a custom application.
Demo time!
Let's look at how to use both tools together. The entire code for this demo is available in the following GitHub repository.
Adding nodes and edges to the lineage graph
Imagine that you have the following flow ingesting data into a staging area of your data warehouse:
If the underlying data originates from an e-commerce source system, we may want to add this system as an upstream node in a lineage graph. Here is how it can be accomplished in a Prefect flow:
The MonteCarloCreateOrUpdateLineage task requires us to specify the source and destination nodes. In our use case, we want to add an upstream node to indicate where the table prefect-community:jaffle_shop.raw_customers came from. The node name, object ID, object type, and resource name for the source node are arbitrary text fields, and you can use those as you see fit.
The destination node’s name and ID are usually identical to the display name you can see in your Monte Carlo Catalog. The resource name for the destination node is the name of your Monte Carlo data warehouse resource—in our example, it’s BigQuery. Monte Carlo primarily supports Snowflake, Redshift, BigQuery, but additional warehouses can be added as well. Later we will find out how to list all resources available in Monte Carlo using a separate Prefect task.
The tags are key-value pairs. The task requires that each tag is created using a dictionary with the propertyName as a tag name, and propertyValue as a tag value. In this example, the tag indicates which team owns this dataset.
tags=[{"propertyName": "dataset_owner", "propertyValue": "marketing"}]
Lastly, the expire_at field allows you to expire the relationship between those nodes. For instance, if you change the source system later, you can expire this edge and create a new one.
The end result of the task is that it creates a new node for the source node, as well as the edge between this new node and the existing destination table. Additionally, it adds both custom tags (our dataset owner) and Prefect-specific tags, as long as the prefect_context_tags flag is set to True (default).
Here is the end result in the Monte Carlo dashboard after we ran the flow:
Adding standalone nodes and tags to the lineage graph
An alternative solution to add the lineage information is to add tags to the existing tables. Let’s assume that for the table raw_payments you don't want to add an upstream node and edge. Instead, you wish to add the source system information as a tag.
You can see that currently, the table raw_payments has no tags:
Here is how we can add tags using a single API call in a Prefect task:
Now we can register and run the flow:
prefect register --project dwh -p flows/prefect run --name 02_add_custom_lineage_tags --execute
The UI proves that the flow run was successful:
And here is the final result in the Monte Carlo dashboard:
You can now go to the Monte Carlo Catalog and search for “stripe”. Make sure to check the “include tags” box. This way, we can easily find tables based on specific tags:
Querying Monte Carlo resources
The last Monte Carlo task is only querying the Monte Carlo API for resource names and allows you to find the exact resource name related to a specific table. Here is an example usage in a flow:
The output of the task will show your data warehouse(s) and any custom resources you added yourself:
Next steps
In this post, we looked at what Monte Carlo is and how it can be used with Prefect to provide greater visibility into your data operations through observability and lineage. This integration can help you to identify and resolve data incidents significantly faster and tells you what processes are affected by data quality issues of specific tables. Using Prefect and Monte Carlo together helps to build more reliable data systems by making it easier for you to resolve data downtime before it affects downstream consumers.
If anything about what we’ve discussed in this post is unclear, feel free to ask your questions in our community Slack.
We can’t wait to see what you build. Thanks for reading, and happy engineering!