All teams that craft a microservices architecture eventually run into the same problem: what happens when you need to manage multiple environments or run background tasks in parallel to your services?
You likely have a lot of jobs that don’t seem to fit anywhere else - e.g., confirming user email addresses, processing documents, deploying a new Docker image. So what do you do? Usually, you write a little bit of code and put it…somewhere. A cron job here, a serverless function there…it’s all just quick function deployments, right? This falls apart quickly when there’s a failure happening somewhere within this web of interdependent functions.
The solution? Use a microservices orchestration platform to manage and deploy all of your code from one place. In this article, I’ll show how to do exactly that using Prefect and Docker.
Microservices architectures include both long-running core app services as well as discrete on-demand task queues that must run tasks external to the app services themselves (i.e., background work and event processing).
🙋 Question 1: where do you centrally deploy this work from? Specifically - how do you keep track of resource consumption and various infrastructure requirements?
Consider a task queue being one type of service. Most background tasks don’t require direct user interaction until the process completes. Many are too time-consuming to complete within the lifespan of a single HTTP request. Tasks still usually need to run in containers that must be deployed when needed and spun down when the task completes. Tasks might have dependencies on other services, such as requiring user input through an internal application when done.
Initially, you might gravitate towards using cron jobs. With their straightforward scheduling mechanism, they appear to be a simple solution for executing recurring tasks. But as the number of cron jobs increase, so does the need for visibility and control. You end up with cron jobs running throughout your infra–on different clusters or different clouds altogether–making it difficult to track what's currently running, using up your infra budget, and potentially failing over time.
Let’s talk about serverless functions. They’re great at providing scalability and flexibility, but they lack the comprehensive control and oversight necessary for complex microservices orchestration. In particular, debugging a set of intertwined serverless functions is quite difficult, given no dependencies are encoded.
🙋 Question 2: how do you understand if this work is functioning properly? Specifically - how do you track failed runs, failed triggers, and missed dependencies?
A workflow orchestration platform brings order to the chaos of managing various microservices. Deploy your infrastructure consistently and reliably, whether it be for long-running services or ad-hoc background tasks. Considering your microservices as workflows that require orchestration enable them with critical features such as centralized logging, retry logic, and a versatile number of triggers that can be schedule, event, or webhook based.
Ideally, your workflow orchestration platform should make it easy to package and run single tasks in a Docker container, deployed to the infrastructure and cloud of your choice. While these tasks might not require long-running infrastructure, a versatile solution should support long-lived behaviors such as an application backend as well. This is the beauty of microservices orchestration - when paired with workflow orchestration, any task or service that needs to be run in parallel to an application can be deployed in a centralized and consistent way.
What types of operations in a microservices architecture require workflow orchestration? Here are a few common examples:
Prefect is a workflow orchestration system that enables writing and monitoring complex microservices across your infrastructure. Using Prefect, you can write flows in Python that you then package and deploy as Docker containers. You can then run this flow on any container orchestration system, such as Kubernetes or Amazon Elastic Container Service (ECS).
How you run your Docker-enabled Prefect workflows is up to you. You can run them on a schedule or as event-driven workflows triggered by, say, a post to a Kafka topic.
Let’s walk through an example of how Prefect enables this. Let’s assume you’re working at a financial institution that requires proof of identification for loan verification. When the user uploads their ID (driver’s license, etc.), you want to read the information automatically and verify its authenticity.
After analyzing the problem, you break it down into the following steps:
Let's see how this works in practice. First, create a file containing the actions you’ll automate using Prefect flows. You can place the following code snippets in a file called document_verification.py.
Task 1: Retrieving a file from S3
1import boto3
2import requests
3import psycopg2
4from prefect import task, Flow
5
6@task
7def retrieve_file_from_s3(bucket_name, file_key):
8 s3_client = boto3.client('s3')
9 file = s3_client.get_object(Bucket=bucket_name, Key=file_key)
10 return file['Body'].read()
Task 2: Processing an image with OCR
1@task
2def process_image_with_ocr(image_data):
3 # Assuming image_data is in the correct format for the OCR API
4 ocr_endpoint = "https://vision.googleapis.com/v1/images:annotate"
5 response = requests.post(ocr_endpoint, json=image_data)
6 return response.json()
Task 3: Writing data to a database
1@task
2def write_to_database(processed_data, db_connection_string):
3 conn = psycopg2.connect(db_connection_string)
4 cur = conn.cursor()
5 # Example pseudocode to insert data
6 # cur.execute("INSERT INTO your_table (column1, column2) VALUES (%s, %s)", (data1, data2))
7 conn.commit()
8 cur.close()
9 conn.close()
Example Flow Script
1@flow
2def document_verification():
3 bucket_name = "your_bucket"
4 file_key = "your_file_key"
5 db_connection_string = "your_db_connection_string"
6
7 file_content = retrieve_file_from_s3(bucket_name, file_key)
8 ocr_data = process_image_with_ocr(file_content)
9 write_to_database(ocr_data, db_connection_string)
10
11if __name__ == "__main__":
12 document_verification()
When run manually from the command line, the flow looks like this:
The tasks define the individual components of your flow application. The flow script at the end runs these tasks in the desired order, supplying the output of the previous task as the input for the next task.
This is the core code for your flow. To get it to run, you’ll need to add a little additional code. But before going there, you’ll need to set up a work pool.
On Prefect, a flow runs on a work pool. A work pool organizes your flow runs and points to the back-end compute that Prefect will use to run your flow. Configuring a work pool enables you to use dynamic infrastructure - e.g., ephemeral containers - instead of running dedicated infra for your flows.
For Docker containers, Prefect supports two methods for configuring work pools:
Let’s walk through creating a push work pool for the above task. To create a Prefect work pool for ECS, install the AWS CLI and set up your credentials using aws configure. This is required so that Prefect can create infrastructure in your AWS account.
Here, document-verification-pool is the name of the work pool as it’ll appear in your push work pools list in Prefect. The --provision-infra flag means that Prefect will automatically provision necessary infrastructure and create a new push work pool.
After asking whether you want to customize your resource names, Prefect will describe the resources it’ll create in your account and then ask if you want to create them. These include IAM roles and users for managing ECS tasks, an IAM policy, an Amazon Elastic Container Registry (ECR) instance to store your container images, and an ECS cluster for running your images.
Now that you’ve created a work pool, you need to create a deployment for your flow. While flows can be run locally (e.g., from your dev machine), you need to deploy a flow to Prefect Cloud in order to trigger it from an external process via a webhook.
First, create a deployment by using a Dockerfile to create a customized Docker image that’ll run your code. Save the following code to a file named Dockerfile in the same directory as your flow code:
1# We're using the latest version of Prefect with Python 3.10
2FROM prefecthq/prefect:2-python3.10
3# Add our requirements.txt file to the image and install dependencies
4COPY requirements.txt .
5RUN pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir
This Dockerfile uses the base Prefect image. It adds a couple of commands to install the dependent packages your Python code requires. Before running this, you’ll need to create a requirements.txt file in the same directory with the following contents: boto3, requests, psycopg2-binary.
Next, replace __main__ in document_verification.py with the following code that creates a Docker-based deployment:
1from prefect.deployments import DeploymentImage
2
3...
4
5if __name__ == "__main__":
6 document_verification.deploy(
7 name="document-verification-deployment",
8 work_pool_name="document-verification-pool",
9 image=DeploymentImage(
10 name="prefect-flows:latest",
11 platform="linux/amd64",
12 dockerfile=”.Dockerfile”
13 )
14 )
Run this file on the command line using: python document_verification.py
You should see the following output:
Soooo let me pause and explain, because there’s a little magic happening here.
When you ran the prefect work-pool create command above, Prefect created both an Amazon ECS cluster as well as an Amazon ECR cluster on your behalf. After you ran this command, it automatically logged you into your ECR repo and made it the default for Docker container builds and pushes.
Note: If your login to your ECR repository expires (which it will, eventually), you can use the standard get-login-password AWS CLI command to refresh it: aws ecr get-login-password --region aws-region | docker login --username AWS --password-stdin aws-account-number.dkr.ecr.aws-region.amazonaws.com
In the Python code you added, the deploy() method creates a new deployment using the DeploymentImage class. This built the image on your behalf, pushed it to your ECR repository in your AWS account, and made it the target container image for your deployment. Whenever you trigger this deployment, it’ll run your flow code in an instance of this Docker image.
The end result is that you should see a new deployment in the Prefect Cloud UI:
Now that you’ve deployed the flow and set up with a push work pool, you need a way to start a flow run. You can use a Prefect Webhook to receive an SNS notification from S3 whenever a process uploads a new file to a designated S3 bucket.
You can create a webhook using the CLI: prefect cloud webhook create document-verification-webhook --description "Receives webhooks from the document verification" --template '{"event": "notification.received", "resource": {"prefect.resource.id": "bucket.file.{{body.file_key}}"}}'
This creates a webhook with the name document-verification-webhook, a description, and a template. The template fires an event named notification.received and receives in the webhook call a parameter called file_key that is retrieved from the body of the HTTP request.
Prefect will generate a unique URL for your webhook. You can subscribe this URL to an SNS topic in AWS, and then set this topic to receive event notifications from an S3 bucket whenever any process creates a new S3 object.
To associate this webhook with your deployment, you need to create an automation. Login to the Prefect Cloud UI, go to Automations, and click to create a new automation with a Trigger Type of Custom. Then, fill in the following JSON configuration. The match field in the JSON will cause Prefect to look at the event data sent by the external application. When it finds a successful match, it’ll invoke your deployment:
1{
2 "match": {
3 "prefect.resource.id": "bucket.file.*"
4 },
5 "match_related": {},
6 "after": [],
7 "expect": [],
8 "for_each": [
9 "prefect.resource.id"
10 ],
11 "posture": "Reactive",
12 "threshold": 1,
13 "within": 0
14}
Select Next. On the next screen, in Action Type, select Run a deployment, and then select the deployment you created above.
Finally, select Next, give your automation a name (e.g., document-verification-automation), and save it. Now, every call to the webhook will run the document processing flow on the newly uploaded S3 document!
I’ve shown how, using Prefect, you can create ephemeral infrastructure to handle background tasks and task-like microservices. By moving this microservices work into a workflow orchestration platform like Prefect, you can deploy it all from a single, unified location, where you can easily respond to and diagnose failures. This level of monitoring is indispensable for ensuring work completes, rather than simply being “fired and forgotten”.
Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.