In this article, we’ll walk through how Great Expectations can easily be incorporated into Prefect flows and the opportunities that unlocks.
Consistent issues with data integrity erodes stakeholder confidence and trust. When data is found to be unreliable, stakeholders will naturally stop relying on it. A troubling conundrum indeed.
Fortunately, Great Expectations
provides a robust suite of tools for data engineers to build automated testing for their data. By defining expectations, data engineers are able to create assertions that verify the quality of their data based on expected attributes. Performing data validation before publishing data to stakeholders helps to ensure data integrity.
Sounds easy, but the dataflows that deliver data to stakeholders are often complex processes with many moving parts, so how can we incorporate data validation into our dataflows without upsetting the delicate balance?
Prefect is a platform that can orchestrate tools across the data stack to work in harmony. Via Prefect’s easy to use Python client, data engineers can create dataflows via plain Python code to orchestrate complex processes. Prefect flows and tasks enable you to break dataflows down into discrete tasks, where you can insert data validation between tasks, if necessary, to maintain stakeholder confidence in the data that you provide.
In this article, we’ll walk though how Great Expectations can easily be incorporated into Prefect flows and the opportunities that unlocks. All code for this blog is available on GitHub
Great Expectations validations can easily be executed within Prefect flows with the
RunGreatExpectationsValidation task in Prefect’s task library.
In its simplest form, the
task allows flow authors to specify a pre-existing Great Expectations checkpoint
to run during a Prefect flow run. A checkpoint is a convenient abstraction for bundling validation of data against an expectation suite. Running a checkpoint will cause Great Expectations to run through the configured suite of expectations and notify the user whether or not the expectations were met.
Here is an example of a Prefect flow that runs a validation with a Great Expectations checkpoint:
The above flow will run successfully if all the expectations outlined by the checkpoint successfully pass, and fail if any of the expectations are unsuccessful. Expectations vary in their application, but failures can be caused by values in a given column falling out of a given range, or if there are fewer rows that expected in the data set. This behavior is useful if you want to avoid loading or operating on non-compliant data.
Configuring the root context directory
By default, the
task will search the current working directory for a directory named great_expectations
to discover the structure of your Great Expectations project. If this directory is located elsewhere, you can use the
argument for your
to change where the task looks. If you haven’t yet set up a Great Expectations data context, you can refer to Great Expectations’ getting started guide
for how to set up a data context.
For example, if your Great Expectations project was located within the my-great-expectations-project directory within the current working directory:
Now that we have some of the basics of the
RunGreatExpectationsValidation task under our belts, let’s see how we can use the
RunGreatExpectationsValidation task within some more complex flows.
Running validation with Prefect Cloud
Let’s say we wanted to run validation within a flow orchestrated by Prefect Cloud
. Prefect Cloud is Prefect’s hosted orchestration layer for managing your Prefect flows. You can learn how to deploy a flow to Prefect Cloud in the Prefect documentation
When a flow is registered with Prefect Cloud, the flow itself with be registered and available for execution, but our Great Expectations project will not be available by default within the execution environment. If your Great Expectations project is hosted in a remote git repository like GitHub, you can include a step in your flow to clone the project before running the validation. Here is an example of flow that includes a custom
clone_ge_project task used to pull the Great Expectations project from GitHub:
In this example, we clone our project containing our Great Expectations project and set
context_root_dir equal to the local path where the project was cloned. This way our checkpoint and data context is available when we run our validation.
We clone the repository with a Prefect task so we automatically get retries for robustness in case GitHub is spotty. We’ve also parameterized this flow with Prefect
so that we can change the remote location of our Great Expectations project without needing to modify our flow.
We also retrieve our GitHub access token as a Prefect Secret
which allows us to securely store sensitive information and credentials that are necessary for a flow.
Validating transformed data in memory
Up to this point, we’ve been running validations on the classic Yellow Taxi Trips dataset
included in the Great Expectations getting started tutorial
, which is stored in the same repository as our flow and Great Expectations project. In a real world scenario, we’d most likely be running validations against data stored separately from our project or running validations on data that we’ve transformed to make sure that it’s compliant after our transformations.
Here’s an example how we can perform both of those operations in a Prefect flow:
This flow builds upon what we’ve done so far:
we’re pulling our Great Expectations project from GitHub in our
running our data against our preconfigured checkpoint with our
run_valitation task which was initialized from
Now we’re also:
pulling data from S3 in our
transforming that data via Pandas in our
RuntimeBatchRequest with our
DataFrame in our
running validation against an in-memory DataFrame with our
To accomplish this we’re taking advantage of Prefect’s first class dataflow operations. We are able to pass complex objects like our
DataFrame and our
RuntimeBatchRequest between tasks without needing to write them to a database or external storage location. This means that we can load, transform, and validate our data in our Prefect flow all before loading that transformed data into another location like a data warehouse. Our transformation in this flow is to increment the passenger count in each row by one. An overly simple and illogical transformation, but hey — it’s a demo.
Validation Results as a Prefect Artifact
When running Great Expectations Validations in Prefect Cloud, the
task will create a Prefect Artifact
containing the Great Expectations validation results. Prefect Artifacts can either be a URL or a markdown document for display in the Prefect Cloud UI. The
task constructs a markdown document from the Great Expectations validation results for easy viewing in the Prefect Cloud UI. This is great for debugging why a validation may have failed with out needing to maintain a separate location and managing uploads of validation results. Here is an example of validation results displayed in the Prefect Cloud UI:
Further up and further in
By using Prefect Cloud with Great Expectations we’re able to automatically run validations in our data processes and maintain our stakeholders confidence of the data that we provide, but these examples only scratch the surface of what is possible with Great Expectations and Prefect together.
By subclassing the
RunGreatExpectationsValidation task, data engineers can create more specialized tasks that make it easier to perform complex validations across multiple flows. Taking advantage of the
RunGreatExpectationsValidation task’s ability to accept runtime data contexts and checkpoints allows validations to be parameterized for use in lots of scenarios.
For more information, check out the documentation for the RunGreatExpectationsValidation
task in the Prefect docs and How to Use Great Expectations with Prefect
in the Great Expectations docs. There are tons of possibilities and we’re excited to see what you’ll build!