Workflow Orchestration

The Implications of Scaling Airflow

And how Prefect addresses the downsides of Airflow at scale.

September 20, 2023
Sarah Krasnik Bedell
Director of Growth Marketing
Share

This deep dive explores the downsides that emerge while scaling Airflow, and how they can be addressed between Airflow and Prefect. When one data pipeline turns into a full-blown data engineering stack with workflow orchestration at its core, new ramifications arise: handling different use cases for workflow times and triggers, onboarding new team members, and staying lean with infrastructure cost.

You can’t count to ten without starting at one, but once you start counting and get past the tricky teens, you can easily get to 100. Similarly, every data engineering team starts with “that one critical data pipeline”. The one pipeline expands to three, and before you know it you have dozens of pipelines running with varying scaling needs.

Broadly, Airflow is considered a workflow or data orchestrator. You can think of Airflow like a Celery wrapper with some bells—consider the CeleryExecutor as a starting point.

Technically, Airflow is composed of a few components:

  • DAG files: the actual workflows being run
  • Scheduler: responsible for scheduling and submitting work including individual tasks
  • Workers: where code is run when it’s run on remote infrastructure, called executors (like Celery or Kubernetes)
  • Webserver: the UI and entry point for all Airflow users
  • Database: the backend keeping track of workflow state

More importantly, the team deploying Airflow also inherits (or relies on someone to inherit) the following responsibilities:

  • The infrastructure on which all the above components run
  • Dependencies on external systems that aren’t owned by this team (like a warehouse)
  • Notifying dependent teams when workflows fail and debugging why they failed, including failures of external systems

When I led the data engineering team at Perpay, a small but mighty 50 person fintech startup based in Philadelphia, we had DAGs with hundreds of tasks and that’s only counting internal analytics. We were already bottle-necked by my team holding not only the keys but also the internal knowledge of our Airflow instance.

All of Airflow’s components have to interact gracefully, have monitoring in place, and the business logic being orchestrated has to have an owner that knew enough about doing root cause analysis in Airflow. Now how would this type of bottleneck scale for companies with 100, 200, or 1000 employees all trying to use data to make better functional decisions?

With scale comes complexity, which an orchestrator should aim to decrease. Scaling Airflow is complex and a time-consuming effort for both new data engineers to learn and existing data engineers to maintain. Prefect was built to handle requirements for both simple local testing/onboarding in Python and handling diverse use cases between infrastructure and trigger criteria. But let me show you exactly what I mean.

Starting with the basics: our toy workflow yet realistic use case

Let’s start with something concrete. Imagine an order fulfillment workflow, which holds the logic for single order processing, with broad-stroke Python pseudocode. Adding Airflow @task and @dag decorators yields the following file:

1import random
2from airflow.decorators import dag, task
3from datetime import datetime, timedelta
4
5@task
6def get_backorder_status(order_id):
7    # Lookup some order metadata placeholder
8    return random.choice([True, False])
9
10@task
11def send_email(order_id):
12    # Send email logic would go here
13    print("send email")
14
15@task
16def create_order(order_id):
17    # Order creation logic would go here
18    print("create order")
19
20@task
21def notify_supply_chain_team(order_id):
22    # Notify the supply chain team of the result for each order
23    pass
24
25@dag(
26    start_date=datetime(2022, 8, 1),
27    schedule_interval="@daily",
28    tags=["taskflow"]
29)
30def fulfill_order(order_id=1):
31    # Realistically you'd pass in an order_id as a parameter
32    # Fetch some order metadata, fulfill if it's not backordered.
33    # If it's backordered, send customer email notification
34    backorder = get_backorder_status(order_id)
35    if backorder:
36        send_email(order_id)
37    else:
38        create_order(order_id)
39    notify_supply_chain_team(order_id)
40
41
42# The dag instance must be a top level key
43fulfill_order_dag = fulfill_order()

An e-commerce company at the scale of needing a data engineer is likely processing more than one order at once. The following must be true for the logic specifically related to order processing:

  • Dependencies across functions. These should be logged and easily debugged by knowing which order is being processed per workflow run.
  • Handling multiple runs. Presumably we will have multiple orders being processed. We will need to either fetch all orders, or be able to pass parameters into the order processing workflow from somewhere else.

A workflow orchestrator should handle all data engineering and related work, not just this one use case. For that to be true we need to consider:

  • Varying infrastructure requirements. Any other workflow outside of the supply chain side of an e-commerce business will likely have different infrastructure requirements—product recommendation model training will need hefty ML libraries like numpy or sklearn and hefty memory constraints.
  • Varying trigger criteria. Workflows don’t have to be triggered by time—they can be: event-driven, file-driven, or manual. Consider a world in which each order should be processed by this logic when it is placed, not one hour later.
  • Varying permissions by team. From an organizational perspective, different sub-teams would care about different workflows and require different permissions for subsets of workflows being run on the orchestrator. It’s never a good idea to give out admin keys across the board.

When a business scales, data engineering practices need to scale with it. In this context, the data engineering team needs to handle a broader set of use cases, dependencies, infrastructure size, workflow failure cases, and stakeholder teams. Let’s address these considerations one at a time.

How Airflow handles dependencies

First, a philosophical question: why would you split up work into separate tasks in the first place?

  • A monolithic workflow architecture relies solely on logs to debug when something goes wrong. This presents a big risk: simply forgetting to print a log statement could result in the complete inability to debug a critical failure.
  • Responding to failed tasks should mean having visibility into separate tasks to easily re-run downstream tasks only.
  • An orchestrator allows you to parallelize code through one to many task dependency relationships. Lean on existing infrastructure to parallelize work instead of learning complicated libraries like asyncio.

Now that we’ve established dependent functions should be separate tasks, let’s refresh ourselves what the code order tells us in our toy workflow:

1backorder = get_backorder_status(order_id)
2if backorder:
3        send_email(order_id)
4else:
5        create_order(order_id)
6notify_supply_chain_team(order_id)

All functions are decorated as Airflow tasks, so we should be good on visibility. Writing out the order of operations would be something like: get_backorder_status >> [send_email or create_order] >> notify_supply_chain_team.

We glean this from reading the Python code top to bottom. If we could run this as a pure Python script, we would expect: get_backorder_status to run, then either send_email or create_order to run based on the branching logic, and after both of those are done and don’t raise exceptions, notify_supply_chain_team to run.

When running the full DAG in Airflow, in the webserver UI, you can see the dependencies are kind of deduced by looking at the order of task runs in the gantt chart below.

Red usually represents failure—and Airflow is no different in this way. The first task failed. But didn’t we say in Python, the following tasks shouldn’t have run?

🚨 Complexity #1: Airflow doesn’t honor native Python behavior with runtime order dependencies without adding Airflow-specific code.

When adding a raise Exception() clause in the get_backorder_status task, the downstream tasks still ran. It’s unclear why the backorder value was true, or why Airflow allowed behavior to occur that’s unintuitive for the average Python developer. Getting to the expected behavior is most certainly possible, but is something that a new engineer to Airflow has to spend time learning about.

Remember that we are growing our data engineering team, and presumably junior engineers might only just be learning Airflow. A behavior of Airflow that requires onboarding and learning is something I like to call an “airflow-ism”.

☢️ Airflowism #1: Add extra task dependency syntax outside of Python runtime order.

The expected behavior can be achieved by any of the following:

When writing workflows, they will inevitably fail, and when they do, debugging them could be a :dumpster_fire:, but doesn’t have to be. However, we want to mitigate the size of this fire.

How Airflow handles exceptions, validations, and debugging

This section is divided into two subsections (I know, stay with me here)—first when running our toy workflow as is, and second when getting closer to reality: running multiple orders, which throws a wrench into things.

Exceptions: the single order

I got a bit ahead of myself talking about exceptions in the context of dependencies, so let’s take a step back into the philosophical space again. What does a data engineer’s development cycle look like? Spoiler alert: it’s not so different from a software engineer’s development cycle.

  • Write code and run it locally. This is where syntax errors get resolved, and the workflow is run with some sort of test or mock data.
  • Test code in a more production-like context. Production for data engineers means more realistic data, more external dependencies, and a higher number of concurrent workflows (stress testing is important!).

Orchestration is often the bread and butter of data engineering development, but it should start with one simple use case. These steps remain for any Python code orchestrated in Airflow, with one quirk: it’s no longer just Python.

🚨 Complexity #2: Testing Airflow DAGs and tasks requires onboarding with a local Airflow setup.

Our toy workflow is a Python script with a few decorators, nothing more. We aren’t even introducing external dependencies yet—we are at the simplest of simplest possible examples.

When adding if name == "main": fulfill_order() at the end of the DAG file and running the Python file from the terminal, nothing happens. We have one choice only: testing Airflow requires running a standalone Airflow instance on your local machine. This is hefty. At Perpay, we found this to be quite burdensome for both the analysts and data scientists that relied on orchestrated work in Airflow.

Now, to the actual testing portion where we flesh out any more errors: you’ll find errors first on the top section of the Airflow (webserver) UI when importing the DAG code. But you know what? If the Airflow DAG syntax is wrong, the DAG won’t load in the UI at all, making it that much harder to debug.

These errors don’t automagically update when you fix the code. The webserver is unfortunately more static than you’d expect. While certain pieces have auto-refresh, the main page doesn’t (yet—I sincerely hope they add it, considering this code was written with the most recent stable version of Airflow: 2.7).

☢️ Airflowism #2: Constantly reloading the webserver to check for DAG import errors.

A new engineer must know to look at the top of the webserver UI, and unfold the gray bar to see the actual traceback. When getting past Python syntax or Airflow syntax errors, running the DAG will yield logical errors. These logical errors are the harder of the two to debug—added context means added complexity, naturally.

Exceptions: more realistically running multiple orders

Consider a more realistic (but still pseudocode) example: fulfilling multiple orders. We can update our existing workflow by making the single order process a task, while making the bread and butter logic be fetching all outstanding orders.

The fulfill_order function used to be a DAG but now gets the @task decorator, and the @dag decorator is placed on a new function all_orders() which simulates fetching all outstanding orders.

1@task
2def fulfill_order(order_id=1):
3    # Realistically you'd pass in an order_id as a parameter
4    # Fetch some order metadata, fulfill if it's not backordered.
5    # If it's backordered, send customer email notification
6    backorder = get_backorder_status(order_id)
7    if backorder:
8            send_email(order_id)
9    else:
10            create_order(order_id)
11    notify_supply_chain_team(order_id)
12
13@dag(
14    start_date=datetime(2022, 8, 1),
15    schedule_interval="@daily",
16    tags=["taskflow"]
17)
18def all_orders():
19		# Logic here for fetching all open orders.
20    for order_id in [1,2,3,4,5]:
21        fulfill_order(order_id)
22
23# The dag instance must be a top level key
24all_orders_dag = all_orders()

Each of the fulfill_order() runs contains the other tasks we cared about before, nothing has changed—we have just added another layer onto things. Let’s remind ourselves the order we expect in the underlying logic:
get_backorder_status >> [send_email or create_order] >> notify_supply_chain_team.

We can get a bit crazy here and parallelize processing these orders, but we won’t even do that—we’ll take the simpler sequential approach. Well, it should be simpler, right?

When running this DAG that kicks off tasks in a for-loop, we actually add far more complexity than we anticipated. As always, workflows will inevitably fail at some point, and this will probably be a :dumpster_fire:.

  • How do we understand which order was being run at the time of failure?
  • How do we debug and rerun the failed workflow for the specific order(s) failed?
  • How do we know in which nested task the failure occurred?

The short of it is this: raw logs will give us a lot of the answer, but not all of it. Reverting back to the key loophole of raw logs: they require a human to remember to write a print statement and the output has to be useful.

🚨 Complexity #3: Scaling to looped or concurrent processes makes it harder to debug workflows in Airflow.

In theory, this is what an orchestrator should be built for. Monolithic workflows are perfectly fine in something like Celery, but we adopt a more sophisticated solution with the hopes of getting a significantly more sophisticated set of features.

When running this more realistic multiple orders process we lose visibility into the underlying tasks and logic. In Airflow’s webserver UI, we can only go one level deep to see runtime, dependencies, and logs.

☢️ Airflowism #3: Logs are not centralized but rather found only in the task instance.

To understand why something failed, you have to dig around in specific task logs. When the nested structures aren’t exposed, this creates a more opaque layer between you and the reason your workflow failed.

There’s always a workaround—maybe not an elegant one, but a workaround nonetheless. You could:

  • Use SubDags, or better yet TaskGroups, to represent nested dependencies as a UI grouping concept. However, this requires significant amount of added Airflow-specific code and dependency logic.
  • Dynamically create DAGs while iterating through orders. In this approach, visibility is lost into the higher level process generating the DAGs. Airflow documentation advises against this approach.

We didn’t event get into true concurrency on the infrastructure level, but just scratched the surface of the setup that’s needed to run more instances of a workflow. What about running more of different types of workflows?

How Airflow handles infrastructure requirements

Product recommendations are a natural part of any e-commerce business. If we can learn from the best, Amazon’s entire homepage is about a dozen different UI components of different types of recommendations.

Taking it back to our toy workflow—it doesn’t have many requirements. When building beyond the pseudocode, I’d imagine some database pings, basic Python packages (like random), and enough memory to process some batched set of orders (which itself is not a memory-intensive process). If we want to add a common product recommendation engine, we have a few new requirements:

  • Importing new libraries. Sklearn depends on numpy which depends on many other packages being installed, increasing the size of the image required to run this workflow.
  • Infrastructure for model training. This doesn’t necessarily scale linearly as the number of products scales.
  • Cross-functional and cross-system dependencies. Systems could include Databricks, and functions could include data science and ML teams that will be authoring these workflows.

Keeping us honest here—why should we care? Can’t we just build the maximal image required and be off to the races? We could, but the CFO won’t be too happy. Scaling infrastructure is a monetary and efficiency game, as infrastructure bills pile up fast. Finding the smallest image to run a job is preferred.

In Airflow, different image, memory, and other infra settings requirements can be handled by launching different workers. DAGs can be routed to a specific queue, which is assigned a specific worker. In this situation, we can separate ML DAGs from general data processing DAGs. Simple right?

🚨 Complexity #4: Airflow’s infrastructure is long-running and doesn’t handle bursty behavior well.

Workers are long-running containers. Taking the canonical Airflow docker-compose file, workers don’t stop by themselves. They run while Airflow is running, which in theory is all the time as work can always be orchestrated.

1airflow-worker:
2    <<: *airflow-common
3    command: celery worker
4    healthcheck:
5      # yamllint disable rule:line-length
6      test:
7        - "CMD-SHELL"
8        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
9      interval: 30s
10      timeout: 10s
11      retries: 5
12      start_period: 30s
13    environment:
14      <<: *airflow-common-env
15      # Required to handle warm shutdown of the celery workers properly
16      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
17      DUMB_INIT_SETSID: "0"
18    restart: always
19    depends_on:
20      <<: *airflow-common-depends-on
21      airflow-init:
22        condition: service_completed_successfully

Workers are truly long-running infrastructure if they are single containers or single instances. This can certainly be mitigated by running Airflow on something like a container service (ECS) or Kubernetes. With this approach, complexity grows to something that a platform team might be better positioned to be responsible for.

Still, we haven’t resolved all our problems—while Airflow can handle more workers, that are still single points of failure. As Shopify’s data engineering team eloquently states, Airflow still struggles with resources no matter what you do:

  • Difficult to consistently distribute load. Workflow schedules that overlap on the same time interval will always run at the same time, and some manual schedule adjustments have to occur.
  • Resource contention can’t be avoided. Airflow is bottlenecked by scheduler throughput, database capacity, and IP space.

Infrastructure is hard. Ideally, an orchestrator should abstract away as much infrastructure as possible when getting started. Particularly when involving different teams. Speaking of different teams—they would likely be responsible for different DAGs.

How Airflow handles permissions

Continuing with our e-commerce example, the core data engineering team and supply chain teams would have the most interest in the order processing flow. The data science team however would have not only the most interest but need authorship permissions to the product recommendation code.

We could solve this in a few different ways:

  • Multiple repos, one Airflow instance. Different teams could author code in different repositories with different permissions, but all push to one Airflow instance. Loophole: what if they need to do anything in the Airflow UI, how should permissions be delineated?
  • DAG-level permissions. If only this was easy in Airflow. You have to deal with the manifest file, namespaces, and get real creative.
  • Launching multiple Airflow instances for teams. While this is the most thorough solution, it has the most burden: maintaining even more infrastructure.

Shopify’s engineering team again summarizes these issues nicely: DAG authors have a lot of power while associating DAGs with users and teams is quite complex.

🚨 Complexity #5: Workflow organization in Airflow isn’t straightforward without supporting extra instances and infrastructure.

For any organization with any security burden (think healthcare, fintech, or any business selling to that type of organization, which is a lot of us), this is not only critical but fundamentally necessary.

The most thorough solution here is launching separate Airflow instances per team. Let’s rewind to all the components of Airflow—scheduler, webserver, database, workers, and now we have separate worker types for each instance too—this is all becoming quite complex. There’s a maintenance burden and tech debt that’s growing.

There’s something else we haven’t mentioned yet. So far, all these orders we’re talking about are being processed on a schedule—based on time. What if we need to fulfill orders as they’re placed?

How Airflow handles event-driven triggers

An order being placed is an event. Many retail companies, both large (Lowe’s) and small (Nuuly), rely on event streaming platforms like Kafka. Some data engineering workflows are perfectly fine staying batched—like training those product recommendation models. If the orchestrator is to be the heart of data engineering at an organization, it must handle both workflows.

There’s another Apache project worth mentioning here—Apache Beam. It’s built for running both batch and streaming processes, but has its complexities. Most specifically, Beam as compared to orchestrators has even more infrastructure burden and a steeper learning curve.

Let’s walk through what it could look like to have a workflow in Airflow that isn’t triggered by a schedule. In Airflow, we have a concept called Sensors. Sensors wait for something to occur—in the documentation, they can wait for an external event.

Airflow interacts with external events through hits to the API instance. This poses a few questions:

  • Who’s going to implement the API request to Airflow when an event happens?
  • What happens if Airflow is unhealthy and the API request throws an error in the external system?

Let me propose some answers:

  • New software engineering work is required. Airflow cannot hook into existing event systems like Kafka, rather rely on the systems managing Kafka events to also hit Airflow’s API.
  • Dependencies exist on the health of Airflow in other systems. We cannot avoid this dependency, and thus Airflow becomes a critical system to be observed like any other app owned by the core engineering team.

Therein lies the problem. While orchestration is usually the heart of data engineering work, Airflow in its current state adds significant complexity, tech debt, and “Airflow-isms” that make it hard to use for new team members and time-consuming to maintain by existing ones.

🚨 Complexity #6: Airflow cannot natively subscribe to existing event systems to trigger DAGs.

You may be wondering: isn’t that exactly what Sensors are supposed to do? Yes it is.

Airflow has a straightforward implementation of one type of event: a file added into a file system. This sensor, however, has to live within a DAG. The FileSensor can be used as a dependency for some tasks, but as built, not as a trigger for a DAG itself.

1@dag(
2    start_date=datetime(2022, 8, 1),
3    schedule_interval="@daily",
4    tags=["taskflow"]
5)
6def fulfill_order(order_id=1):
7		...
8		FileSensor(
9		    task_id=f'filesensor',
10		    poke_interval=60,
11		    timeout=60 * 30,
12		    mode="reschedule",
13		    filepath=f'./my_path.txt'
14		)

Triggering a DAG itself must be done with an API call to Airflow’s endpoint, something like this from an external system:

1curl -X POST http://localhost:8080/api/v1/dags/fulfill_order/dagRuns

What happens when this post request fails?

  • The external system must handle the failure and continue with its processing
  • We must fetch the REST API history from Airflow to understand where failures occurred.

This presents a heavy burden and handshake between the data engineering team (responsible for Airflow) and the software engineering team (hitting Airflow from external systems). While possible, it is not a scalable solution.

A more scalable orchestration solution: Prefect Cloud

If you’ve followed along to this point, I hope you see where Airflow has its holes. It is heavily use-case driven. Airflow is complicated for:

  • Small teams that need to onboard quickly and have minimal infrastructure requirements
  • Medium-sized teams that need to efficiently support many stakeholders
  • Large teams that need to securely orchestrate different types of workflows on varying infrastructure

To be specific, let’s recap Airflow’s complexities:

  1. Airflow doesn’t honor native Python behavior with runtime order dependencies without adding Airflow-specific code.
  2. Constantly reloading the webserver to check for DAG import errors.
  3. Scaling to looped or concurrent processes makes it harder to debug workflows in Airflow.
  4. Airflow’s infrastructure is long-running and doesn’t handle bursty behavior well.
  5. Workflow organization in Airflow isn’t straightforward without supporting extra instances and infrastructure.
  6. Airflow cannot natively subscribe to existing event systems to trigger DAGs.

We used Airflow at Perpay, and started with it before Prefect was really a thing. Airflow didn’t start out as the heart and soul of all data engineering work, but it quickly became that. As we scaled Airflow, we discovered the complexities listed above firsthand. It became hard to support the team without teaching them the Airflowisms mentioned here (and many that aren’t mentioned here).

If choosing an orchestration tool, consider what a future world would look like when the tool inevitably needs to respond to scale and has growing criticality.

With Prefect, the toy workflow looks something like this.

1import random
2from prefect import flow, task, get_run_logger
3from datetime import datetime, timedelta
4
5@task
6def get_backorder_status(order_id):
7    # Lookup some order metadata placeholder
8    raise Exception()
9    return random.choice([True, False])
10
11@task
12def send_email(order_id):
13    # Send email logic would go here
14    print("send email")
15
16@task
17def create_order(order_id):
18    # Order creation logic would go here
19    print("create order")
20
21@task
22def notify_supply_chain_team(order_id):
23    # Notify the supply chain team of the result for each order
24    pass
25
26@flow
27def fulfill_order(order_id=1):
28    # Realistically you'd pass in an order_id as a parameter
29    # Fetch some order metadata, fulfill if it's not backordered.
30    # If it's backordered, send customer email notification
31    backorder = get_backorder_status(order_id)
32    if backorder:
33            send_email(order_id)
34    else:
35            create_order(order_id)
36    notify_supply_chain_team(order_id)
37
38
39if __name__ == "__main__":
40    fulfill_order()

Let’s address each complexity one by one:

1️⃣ Honoring native Python behavior. In Prefect, a raised Exception will stop any following task runs, even if those tasks are simply decorated Python functions running one after the other. Additionally, every Prefect flow can be run as a Python script by simply running python my_flow_file.py from your terminal. This means developing locally when using Prefect is much closer to the production version of those same workflows.

2️⃣ Clear exception debugging and validation. By running the workflow from your terminal, errors will show up in the logs where you’re already testing. In Prefect Cloud, the dashboard will give you a birds eye view into the health of all the pieces of Prefect.

3️⃣ Gracefully building loop or concurrent processes. Flows within flows is not only possible but an encouraged part of Prefect. Subflows will still have task log and dependencies inside, all equally observable. Failures from any task will cause alarm (as they should), no matter how nested they are.

4️⃣ Efficiently scaling infrastructure where workflows actually run. In Prefect, workers poll for new work, and submit flow runs to the infrastructure corresponding to the worker type. Some worker types can even support serverless or auto-scaling infrastructure without scaling the number of workers. The efficiency is then passed on to your cloud bill, which isn’t necessarily the case with Airflow.

5️⃣ Code and permissions organization. First, we introduce the concept of workspaces—they are literally that, different spaces for different types of work with different teams having access. Additionally, custom roles can be setup with granular permissions to different object types. An important note here—these are features of Prefect Cloud, not the open source library.

6️⃣ Event-driven workflows as a first class concept. Webhooks are hosted by Prefect. When using something like Confluent to host Kafka, a post can fire to a webhook whenever a Kafka message appears for a particular topic. The webhook creates an event in Prefect, which thus can trigger a flow to run. Flows deployed to run from events and those that run on a schedule are equally observable and actionable.

What’s next?

No matter if you’re new to orchestration or already an avid Airflow user, I hope I’ve shown you at least what to watch out for with Airflow. We here at Prefect can help find an alternative to the complexity you’ll inevitably see with Airflow—complexity comes with scale, and an orchestrator should mitigate that complexity.

Rent the Runway chose Prefect for this exact reason. Why don’t you check out our story?

Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.