Workflow Orchestration

Scalable Microservices Orchestration with Prefect and Docker

January 25, 2024
Jay Allen
Share

All teams that craft a microservices architecture eventually run into the same problem: what happens when you need to manage multiple environments or run background tasks in parallel to your services?

You likely have a lot of jobs that don’t seem to fit anywhere else - e.g., confirming user email addresses, processing documents, deploying a new Docker image. So what do you do? Usually, you write a little bit of code and put it…somewhere. A cron job here, a serverless function there…it’s all just quick function deployments, right? This falls apart quickly when there’s a failure happening somewhere within this web of interdependent functions.

The solution? Use a microservices orchestration platform to manage and deploy all of your code from one place. In this article, I’ll show how to do exactly that using Prefect and Docker.

The need for a microservices orchestration platform

Microservices architectures include both long-running core app services as well as discrete on-demand task queues that must run tasks external to the app services themselves (i.e., background work and event processing).

🙋 Question 1: where do you centrally deploy this work from? Specifically - how do you keep track of resource consumption and various infrastructure requirements?

Consider a task queue being one type of service. Most background tasks don’t require direct user interaction until the process completes. Many are too time-consuming to complete within the lifespan of a single HTTP request. Tasks still usually need to run in containers that must be deployed when needed and spun down when the task completes. Tasks might have dependencies on other services, such as requiring user input through an internal application when done.

Initially, you might gravitate towards using cron jobs. With their straightforward scheduling mechanism, they appear to be a simple solution for executing recurring tasks. But as the number of cron jobs increase, so does the need for visibility and control. You end up with cron jobs running throughout your infra–on different clusters or different clouds altogether–making it difficult to track what's currently running, using up your infra budget, and potentially failing over time.

Let’s talk about serverless functions. They’re great at providing scalability and flexibility, but they lack the comprehensive control and oversight necessary for complex microservices orchestration. In particular, debugging a set of intertwined serverless functions is quite difficult, given no dependencies are encoded.

🙋 Question 2: how do you understand if this work is functioning properly? Specifically - how do you track failed runs, failed triggers, and missed dependencies?

A workflow orchestration platform brings order to the chaos of managing various microservices. Deploy your infrastructure consistently and reliably, whether it be for long-running services or ad-hoc background tasks. Considering your microservices as workflows that require orchestration enable them with critical features such as centralized logging, retry logic, and a versatile number of triggers that can be schedule, event, or webhook based.

Ideally, your workflow orchestration platform should make it easy to package and run single tasks in a Docker container, deployed to the infrastructure and cloud of your choice. While these tasks might not require long-running infrastructure, a versatile solution should support long-lived behaviors such as an application backend as well. This is the beauty of microservices orchestration - when paired with workflow orchestration, any task or service that needs to be run in parallel to an application can be deployed in a centralized and consistent way.

Microservices orchestration use cases

What types of operations in a microservices architecture require workflow orchestration? Here are a few common examples:

  • Deployment of Docker images for machine learning models: To deploy a new machine learning model, you can automate the process of building and pushing a new Docker image containing the update and pointing your application to the new model version. This enables launching changes with minimal to zero manual intervention, while encoding the dependency between the model deployment and how the app will use the model.
  • Infrastructure management: You can decrease cloud costs by tearing down unused infrastructure automatically. An orchestration platform can automate tasks like spinning up infrastructure components at a specific time, decommissioning them, or restarting them in response to failures.
  • Data processing in response to events: If you’re DocuSign and a new user file appears in a storage bucket, an orchestration platform can automatically trigger a container to process the data immediately. Upon completion, you can notify the user directly in the app. (Note that this requires formally passing state between the task service and the application)
  • On-demand information extraction from documents: Suppose you need to extract text from forms, such as loan applications. You can spin up a container specifically for this purpose and shut it down right after, optimizing resource utilization and reducing costs. You can coordinate the container’s status again directly with the loan application, passing parameters upon loan acceptance or denial to trigger a follow-up experience for the user.

Microservices orchestration with Docker on Prefect

Prefect is a workflow orchestration system that enables writing and monitoring complex microservices across your infrastructure. Using Prefect, you can write flows in Python that you then package and deploy as Docker containers. You can then run this flow on any container orchestration system, such as Kubernetes or Amazon Elastic Container Service (ECS).

How you run your Docker-enabled Prefect workflows is up to you. You can run them on a schedule or as event-driven workflows triggered by, say, a post to a Kafka topic.

Let’s walk through an example of how Prefect enables this. Let’s assume you’re working at a financial institution that requires proof of identification for loan verification. When the user uploads their ID (driver’s license, etc.), you want to read the information automatically and verify its authenticity.

After analyzing the problem, you break it down into the following steps:

  1. Document upload: The user's application begins with the upload of their driver's license. You store the upload in an Amazon S3 bucket.
  2. Notification trigger: Upon successful upload, the application sends a notification to an AWS Simple Notification Service (SNS) topic. This acts as a trigger for the subsequent steps.
  3. Prefect flow activation: The notification in the AWS SNS topic is configured to feed into a Prefect flow via a Prefect webhook. Webhooks enable triggering a Prefect flow from any external process that can make an HTTPS request.
  4. Data processing via containers: Prefect spins up a container in Amazon Elastic Container Service (ECS) to process the request. This container contains the code to extract necessary information from the uploaded driver's license.
  5. Information analysis: The Docker container sends the extracted data to the underwriting team's internal tool for further analysis and decision-making.
  6. Decision communication: Once the underwriting team makes a decision, an application on their end sends a notification to another Prefect flow using a separate webhook. Following the receipt of the decision, your second Prefect flow will update the relevant database with the new information. It also sends an email to the user updating them on the status of their request.

Package flow code into a Docker container

Let's see how this works in practice. First, create a file containing the actions you’ll automate using Prefect flows. You can place the following code snippets in a file called document_verification.py.

Task 1: Retrieving a file from S3

1import boto3
2import requests
3import psycopg2
4from prefect import task, Flow
5
6@task
7def retrieve_file_from_s3(bucket_name, file_key):
8		s3_client = boto3.client('s3')
9		file = s3_client.get_object(Bucket=bucket_name, Key=file_key)
10		return file['Body'].read()

Task 2: Processing an image with OCR

1@task
2def process_image_with_ocr(image_data):
3		# Assuming image_data is in the correct format for the OCR API
4		ocr_endpoint = "https://vision.googleapis.com/v1/images:annotate"
5		response = requests.post(ocr_endpoint, json=image_data)
6		return response.json()

Task 3: Writing data to a database

1@task
2def write_to_database(processed_data, db_connection_string):
3		conn = psycopg2.connect(db_connection_string)
4		cur = conn.cursor()
5		# Example pseudocode to insert data
6		# cur.execute("INSERT INTO your_table (column1, column2) VALUES (%s, %s)", (data1, data2))
7		conn.commit()
8		cur.close()
9		conn.close()

Example Flow Script

1@flow
2def document_verification():
3		bucket_name = "your_bucket"
4		file_key = "your_file_key"
5		db_connection_string = "your_db_connection_string"
6
7		file_content = retrieve_file_from_s3(bucket_name, file_key)
8		ocr_data = process_image_with_ocr(file_content)
9		write_to_database(ocr_data, db_connection_string)
10
11if __name__ == "__main__":
12		document_verification()

When run manually from the command line, the flow looks like this:

The tasks define the individual components of your flow application. The flow script at the end runs these tasks in the desired order, supplying the output of the previous task as the input for the next task.

This is the core code for your flow. To get it to run, you’ll need to add a little additional code. But before going there, you’ll need to set up a work pool.

Create a work pool for our Docker container

On Prefect, a flow runs on a work pool. A work pool organizes your flow runs and points to the back-end compute that Prefect will use to run your flow. Configuring a work pool enables you to use dynamic infrastructure - e.g., ephemeral containers - instead of running dedicated infra for your flows.

For Docker containers, Prefect supports two methods for configuring work pools:

  • Use a serverless push work pool. In a push work pool, Prefect automates creating the required cloud infrastructure to run worker containers in your cloud account. That eliminates the need to pre-provision compute via a separate deployment.
  • Create a work pool that runs on your own infrastructure - e.g., your own existing Amazon ECS cluster, GKS cluster, etc. Use this option if you have custom configuration requirements.
  • Create a managed work pool. Prefect will create the work pool plus the cloud capacity for you on our own backend cloud infrastructure. This frees you from having to manage any computing resource - Prefect does all the work for you, from soup to nuts.

Let’s walk through creating a push work pool for the above task. To create a Prefect work pool for ECS, install the AWS CLI and set up your credentials using aws configure. This is required so that Prefect can create infrastructure in your AWS account.

  • Next, create a Prefect account (if you don’t have one).
  • Then, install Prefect and its associated command-line tools locally using pip: pip install -U prefect
  • Login to Prefect with the following command: prefect cloud login
  • Next, create a work pool like so: prefect work-pool create --type ecs:push --provision-infra document-verification-pool

Here, document-verification-pool is the name of the work pool as it’ll appear in your push work pools list in Prefect. The --provision-infra flag means that Prefect will automatically provision necessary infrastructure and create a new push work pool.

After asking whether you want to customize your resource names, Prefect will describe the resources it’ll create in your account and then ask if you want to create them. These include IAM roles and users for managing ECS tasks, an IAM policy, an Amazon Elastic Container Registry (ECR) instance to store your container images, and an ECS cluster for running your images.

Create a deployment for our flow

Now that you’ve created a work pool, you need to create a deployment for your flow. While flows can be run locally (e.g., from your dev machine), you need to deploy a flow to Prefect Cloud in order to trigger it from an external process via a webhook.

First, create a deployment by using a Dockerfile to create a customized Docker image that’ll run your code. Save the following code to a file named Dockerfile in the same directory as your flow code:

1# We're using the latest version of Prefect with Python 3.10
2FROM prefecthq/prefect:2-python3.10
3# Add our requirements.txt file to the image and install dependencies
4COPY requirements.txt .
5RUN pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir

This Dockerfile uses the base Prefect image. It adds a couple of commands to install the dependent packages your Python code requires. Before running this, you’ll need to create a requirements.txt file in the same directory with the following contents: boto3, requests, psycopg2-binary.

Next, replace __main__ in document_verification.py with the following code that creates a Docker-based deployment:

1from prefect.deployments import DeploymentImage
2
3...
4
5if __name__ == "__main__":
6		document_verification.deploy(
7				name="document-verification-deployment",
8				work_pool_name="document-verification-pool",
9				image=DeploymentImage(
10						name="prefect-flows:latest",
11						platform="linux/amd64",
12						dockerfile=”.Dockerfile”
13				)
14		)

Run this file on the command line using: python document_verification.py

You should see the following output:

Soooo let me pause and explain, because there’s a little magic happening here.

When you ran the prefect work-pool create command above, Prefect created both an Amazon ECS cluster as well as an Amazon ECR cluster on your behalf. After you ran this command, it automatically logged you into your ECR repo and made it the default for Docker container builds and pushes.

Note: If your login to your ECR repository expires (which it will, eventually), you can use the standard get-login-password AWS CLI command to refresh it: aws ecr get-login-password --region aws-region | docker login --username AWS --password-stdin aws-account-number.dkr.ecr.aws-region.amazonaws.com

In the Python code you added, the deploy() method creates a new deployment using the DeploymentImage class. This built the image on your behalf, pushed it to your ECR repository in your AWS account, and made it the target container image for your deployment. Whenever you trigger this deployment, it’ll run your flow code in an instance of this Docker image.

The end result is that you should see a new deployment in the Prefect Cloud UI:

Create an automation

Now that you’ve deployed the flow and set up with a push work pool, you need a way to start a flow run. You can use a Prefect Webhook to receive an SNS notification from S3 whenever a process uploads a new file to a designated S3 bucket.

You can create a webhook using the CLI: prefect cloud webhook create document-verification-webhook --description "Receives webhooks from the document verification" --template '{"event": "notification.received", "resource": {"prefect.resource.id": "bucket.file.{{body.file_key}}"}}'

This creates a webhook with the name document-verification-webhook, a description, and a template. The template fires an event named notification.received and receives in the webhook call a parameter called file_key that is retrieved from the body of the HTTP request.

Prefect will generate a unique URL for your webhook. You can subscribe this URL to an SNS topic in AWS, and then set this topic to receive event notifications from an S3 bucket whenever any process creates a new S3 object.

To associate this webhook with your deployment, you need to create an automation. Login to the Prefect Cloud UI, go to Automations, and click to create a new automation with a Trigger Type of Custom. Then, fill in the following JSON configuration. The match field in the JSON will cause Prefect to look at the event data sent by the external application. When it finds a successful match, it’ll invoke your deployment:

1{
2	"match": {
3			"prefect.resource.id": "bucket.file.*"
4	},
5	"match_related": {},
6	"after": [],
7	"expect": [],
8	"for_each": [
9			"prefect.resource.id"
10	],
11	"posture": "Reactive",
12	"threshold": 1,
13	"within": 0
14}

Select Next. On the next screen, in Action Type, select Run a deployment, and then select the deployment you created above.

Finally, select Next, give your automation a name (e.g., document-verification-automation), and save it. Now, every call to the webhook will run the document processing flow on the newly uploaded S3 document!

Final thoughts

I’ve shown how, using Prefect, you can create ephemeral infrastructure to handle background tasks and task-like microservices. By moving this microservices work into a workflow orchestration platform like Prefect, you can deploy it all from a single, unified location, where you can easily respond to and diagnose failures. This level of monitoring is indispensable for ensuring work completes, rather than simply being “fired and forgotten”.

Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.