Continuous Data Validation with Great Expectations and Prefect
blog

Continuous Data Validation with Great Expectations and Prefect

In this article, we’ll walk through how Great Expectations can easily be incorporated into Prefect flows and the opportunities that unlocks.

Alex Streed | Team Lead (Integrations)

March 31, 2022

Consistent issues with data integrity erodes stakeholder confidence and trust. When data is found to be unreliable, stakeholders will naturally stop relying on it. A troubling conundrum indeed.
Fortunately, Great Expectations provides a robust suite of tools for data engineers to build automated testing for their data. By defining expectations, data engineers are able to create assertions that verify the quality of their data based on expected attributes. Performing data validation before publishing data to stakeholders helps to ensure data integrity.
Sounds easy, but the dataflows that deliver data to stakeholders are often complex processes with many moving parts, so how can we incorporate data validation into our dataflows without upsetting the delicate balance?
Enter Prefect.
Prefect is a platform that can orchestrate tools across the data stack to work in harmony. Via Prefect’s easy to use Python client, data engineers can create dataflows via plain Python code to orchestrate complex processes. Prefect flows and tasks enable you to break dataflows down into discrete tasks, where you can insert data validation between tasks, if necessary, to maintain stakeholder confidence in the data that you provide.
In this article, we’ll walk though how Great Expectations can easily be incorporated into Prefect flows and the opportunities that unlocks. All code for this blog is available on GitHub.
Simple validation
Great Expectations validations can easily be executed within Prefect flows with the RunGreatExpectationsValidation task in Prefect’s task library.
In its simplest form, the RunGreatExpectationsValidation task allows flow authors to specify a pre-existing Great Expectations checkpoint to run during a Prefect flow run. A checkpoint is a convenient abstraction for bundling validation of data against an expectation suite. Running a checkpoint will cause Great Expectations to run through the configured suite of expectations and notify the user whether or not the expectations were met.
Here is an example of a Prefect flow that runs a validation with a Great Expectations checkpoint:
The above flow will run successfully if all the expectations outlined by the checkpoint successfully pass, and fail if any of the expectations are unsuccessful. Expectations vary in their application, but failures can be caused by values in a given column falling out of a given range, or if there are fewer rows that expected in the data set. This behavior is useful if you want to avoid loading or operating on non-compliant data.
Configuring the root context directory
By default, the RunGreatExpectationsValidation task will search the current working directory for a directory named great_expectations to discover the structure of your Great Expectations project. If this directory is located elsewhere, you can use the context_root_dir argument for your validation_task to change where the task looks. If you haven’t yet set up a Great Expectations data context, you can refer to Great Expectations’ getting started guide for how to set up a data context.
For example, if your Great Expectations project was located within the my-great-expectations-project directory within the current working directory:
Now that we have some of the basics of the RunGreatExpectationsValidation task under our belts, let’s see how we can use the RunGreatExpectationsValidation task within some more complex flows.
Running validation with Prefect Cloud
Let’s say we wanted to run validation within a flow orchestrated by Prefect Cloud. Prefect Cloud is Prefect’s hosted orchestration layer for managing your Prefect flows. You can learn how to deploy a flow to Prefect Cloud in the Prefect documentation.
When a flow is registered with Prefect Cloud, the flow itself with be registered and available for execution, but our Great Expectations project will not be available by default within the execution environment. If your Great Expectations project is hosted in a remote git repository like GitHub, you can include a step in your flow to clone the project before running the validation. Here is an example of flow that includes a custom clone_ge_project task used to pull the Great Expectations project from GitHub:
In this example, we clone our project containing our Great Expectations project and set context_root_dir equal to the local path where the project was cloned. This way our checkpoint and data context is available when we run our validation.
We clone the repository with a Prefect task so we automatically get retries for robustness in case GitHub is spotty. We’ve also parameterized this flow with Prefect Parameters so that we can change the remote location of our Great Expectations project without needing to modify our flow.
We also retrieve our GitHub access token as a Prefect Secret which allows us to securely store sensitive information and credentials that are necessary for a flow.
Validating transformed data in memory
Up to this point, we’ve been running validations on the classic Yellow Taxi Trips dataset included in the Great Expectations getting started tutorial, which is stored in the same repository as our flow and Great Expectations project. In a real world scenario, we’d most likely be running validations against data stored separately from our project or running validations on data that we’ve transformed to make sure that it’s compliant after our transformations.
Here’s an example how we can perform both of those operations in a Prefect flow:
This flow builds upon what we’ve done so far:
  • we’re pulling our Great Expectations project from GitHub in our clone_ge_project task
  • running our data against our preconfigured checkpoint with our run_valitation task which was initialized from RunGreatExpecatationsValidation
Now we’re also:
  • pulling data from S3 in our fetch_data task
  • transforming that data via Pandas in our adjust_passenger_count_task
  • constructing a RuntimeBatchRequest with our DataFrame in our create_runtime_batch_request task
  • running validation against an in-memory DataFrame with our run_validation task
To accomplish this we’re taking advantage of Prefect’s first class dataflow operations. We are able to pass complex objects like our DataFrame and our RuntimeBatchRequest between tasks without needing to write them to a database or external storage location. This means that we can load, transform, and validate our data in our Prefect flow all before loading that transformed data into another location like a data warehouse. Our transformation in this flow is to increment the passenger count in each row by one. An overly simple and illogical transformation, but hey — it’s a demo.
Validation Results as a Prefect Artifact
When running Great Expectations Validations in Prefect Cloud, the RunGreatExpectationsValidation task will create a Prefect Artifact containing the Great Expectations validation results. Prefect Artifacts can either be a URL or a markdown document for display in the Prefect Cloud UI. The RunGreatExpecationsValidation task constructs a markdown document from the Great Expectations validation results for easy viewing in the Prefect Cloud UI. This is great for debugging why a validation may have failed with out needing to maintain a separate location and managing uploads of validation results. Here is an example of validation results displayed in the Prefect Cloud UI:
undefined
Further up and further in
By using Prefect Cloud with Great Expectations we’re able to automatically run validations in our data processes and maintain our stakeholders confidence of the data that we provide, but these examples only scratch the surface of what is possible with Great Expectations and Prefect together.
By subclassing the RunGreatExpectationsValidation task, data engineers can create more specialized tasks that make it easier to perform complex validations across multiple flows. Taking advantage of the RunGreatExpectationsValidation task’s ability to accept runtime data contexts and checkpoints allows validations to be parameterized for use in lots of scenarios.
For more information, check out the documentation for the RunGreatExpectationsValidation task in the Prefect docs and How to Use Great Expectations with Prefect in the Great Expectations docs. There are tons of possibilities and we’re excited to see what you’ll build!
Happy engineering!