There are a myriad of reasons why one may wish to trigger a Prefect flow run from an event. One of the most common use cases that we have encountered is: A file lands in object storage (S3, GCS, Azure Blob) and will be the input to a Prefect flow.
Chris White wrote an awesome blog a few years back about using AWS Lambda to trigger Prefect flows through Prefect Cloud GraphQL API. Since then the team have been working hard to further reduce negative engineering and make event-driven flows even easier.
Of this pattern, the feature that I find most useful is the ability to pass overrides to the RunConfig — specifically the setting an ECS/Kubernetes overrides to ensure that our work runs on the appropriate infrastructure.
In the example we’ll show below, we have a Lambda that will check for a file size in S3, then use the RunConfig to set a memory request, which will ensure that an instance with a sufficient amount of memory schedules the flow.
Our Lambda is going to use the Prefect Client to kick off our flow. Now we could do all of this through the GraphQL API, and use
urllib, but the Prefect Client keeps our code tidy, legible, and easy to modify.
Let’s walk through our Lambda function:
A few notes about this Lambda:
decrypt_parameterdecrypts secrets from the AWS System Manager Parameter Store.
runfunction takes an S3 PUT event and extracts
Keyfrom the payload.
get_memory_requiredinterrogates the S3 API for information about the size of the file that was uploaded.
trigger_flow_runkicks off a Prefect flow run with a custom name derived from the S3 key. We pass the file size derived from
RunConfigin order to set a memory request on the Kubernetes job that will be running our flow.
A key point here. We are using the
version_group_id which is the unique unarchived flow within this version group will be scheduled to run. This input can be used as a stable API for running flows that are regularly updated.
This allows us to set our Lambda and forget it, triggering the desired flow regardless of new versions.
Personally, I feel that the AWS-native experience of deploying Python Lambda functions is a bad time. Whether through the console or through AWS CLI, the installing and zipping and dependency management is a chore.
There are several tools that reduce the pain of deployment. My favorite is serverless framework.
Serverless allows us to define our function in a simple
serverless.yml file. Further, serverless has their own concept of plugins. Most relevant to us is the
serverless-python-requirements plugin, which makes the packaging Python requirements painless. The file is broken up into arrays from top to bottom:
providerdefines platform specific configurations. In our case, we have Lambda runtime settings and some IAM role configurations.
functionsdefines any number of Lambda functions as well as some incredibly useful ancillary options:
handlerspecifies which function Lambda is going to start our function.
layersallows us to take advantage of the work that our
serverless-python-requirementsplugin does and package the Python requirements separately from our code, which means that we can still edit code in the Lambda IDE.
eventsis one of my favorites, through it we can automatically subscribe our lambda to AWS events. In our example, we are subscribing our function to specific events from an S3 bucket.
Putting it all together we run
serverless deploy and watch:
Stepping through the logs we can see exactly what is happening, in sequence
Spins up a Docker container to package all of the aforementioned Python requirements.
Creates CloudFormation templates from our YAML file.
Zips up our code and dependencies.
Uploads everything to S3.
Once deployed we can test by dropping a file in the specified bucket or triggering through the Lambda console.
As a next step, you could use this pattern to incorporate event-driven flows with Prefect into your pipeline. Convert a flow that is currently polling into one orchestrated by an external function.
The Prefect client provides a straightforward way to interact with Prefect APIs. Kicking off flow runs is just a small example of the ways that users can take advantage of our Cloud API.
Prefect is the new standard for dataflow automation, and our ultimate mission is to eliminate negative engineering by ensuring that data professionals can confidently and efficiently automate their data applications with the most user-friendly toolkit around.
Join our Slack community for ad-hoc questions.
Follow us on Twitter for updates.
Visit us on GitHub to open issues and pull requests.
Check out our Discourse knowledge base.