Easy Event-Driven Workflows With AWS Lambda V2 Featuring Serverless Framework
Mar. 8, 2022

Easy Event-Driven Workflows With AWS Lambda V2 Featuring Serverless Framework

George Coyne
George CoyneSenior Solutions Architect

There are a myriad of reasons why one may wish to trigger a Prefect flow run from an event. One of the most common use cases that we have encountered is: A file lands in object storage (S3, GCS, Azure Blob) and will be the input to a Prefect flow.

Chris White wrote an awesome blog a few years back about using AWS Lambda to trigger Prefect flows through Prefect Cloud GraphQL API. Since then the team have been working hard to further reduce negative engineering and make event-driven flows even easier.

Of this pattern, the feature that I find most useful is the ability to pass overrides to the RunConfig — specifically the setting an ECS/Kubernetes overrides to ensure that our work runs on the appropriate infrastructure.

In the example we’ll show below, we have a Lambda that will check for a file size in S3, then use the RunConfig to set a memory request, which will ensure that an instance with a sufficient amount of memory schedules the flow.

Our Lambda is going to use the Prefect Client to kick off our flow. Now we could do all of this through the GraphQL API, and use requests or urllib, but the Prefect Client keeps our code tidy, legible, and easy to modify.

Let’s walk through our Lambda function:

A few notes about this Lambda:

  • decrypt_parameter decrypts secrets from the AWS System Manager Parameter Store.

  • Our run function takes an S3 PUT event and extracts Bucket and Key from the payload.

  • get_memory_required interrogates the S3 API for information about the size of the file that was uploaded.

  • trigger_flow_run kicks off a Prefect flow run with a custom name derived from the S3 key. We pass the file size derived from get_memory_required to the KubernetesRun RunConfig in order to set a memory request on the Kubernetes job that will be running our flow.

A key point here. We are using the version_group_id which is the unique unarchived flow within this version group will be scheduled to run. This input can be used as a stable API for running flows that are regularly updated.

This allows us to set our Lambda and forget it, triggering the desired flow regardless of new versions.

Personally, I feel that the AWS-native experience of deploying Python Lambda functions is a bad time. Whether through the console or through AWS CLI, the installing and zipping and dependency management is a chore.

There are several tools that reduce the pain of deployment. My favorite is serverless framework.

Serverless allows us to define our function in a simple serverless.yml file. Further, serverless has their own concept of plugins. Most relevant to us is the serverless-python-requirements plugin, which makes the packaging Python requirements painless. The file is broken up into arrays from top to bottom:

  • provider defines platform specific configurations. In our case, we have Lambda runtime settings and some IAM role configurations.

    • functions defines any number of Lambda functions as well as some incredibly useful ancillary options:

    • handler specifies which function Lambda is going to start our function.

      layers allows us to take advantage of the work that our serverless-python-requirements plugin does and package the Python requirements separately from our code, which means that we can still edit code in the Lambda IDE.

    • events is one of my favorites, through it we can automatically subscribe our lambda to AWS events. In our example, we are subscribing our function to specific events from an S3 bucket.

Putting it all together we run serverless deploy and watch:

Stepping through the logs we can see exactly what is happening, in sequence serverless:

  • Spins up a Docker container to package all of the aforementioned Python requirements.

  • Creates CloudFormation templates from our YAML file.

  • Zips up our code and dependencies.

  • Uploads everything to S3.

Once deployed we can test by dropping a file in the specified bucket or triggering through the Lambda console.

As a next step, you could use this pattern to incorporate event-driven flows with Prefect into your pipeline. Convert a flow that is currently polling into one orchestrated by an external function.

The Prefect client provides a straightforward way to interact with Prefect APIs. Kicking off flow runs is just a small example of the ways that users can take advantage of our Cloud API.

Prefect is the new standard for dataflow automation, and our ultimate mission is to eliminate negative engineering by ensuring that data professionals can confidently and efficiently automate their data applications with the most user-friendly toolkit around.

Happy engineering!

Posted on Mar 8, 2022
Blog Post
AWS Lambda
Amazon S3
Dynamic DAGs
DevOps & CI/CD

Love your workflows again

Orchestrate your stack to gain confidence in your data