engineering

Decomposed Durability for Data Workflows

Adam Azzam
Adam Azzam
VP Product

Your ML training job crashed at epoch 47. Your ETL pipeline died mid-extraction. Two scheduled workflows both tried to process the same batch of data, and now you're not sure which result to trust.

Durable execution systems exist to solve these problems. They persist state so you can recover from failures, and they coordinate concurrent execution so work happens exactly once. But the way a system achieves durability shapes what's possible with it.

The dominant paradigm ties durability to workflow identity. Each workflow execution maintains its own event history, and that history is the source of truth for recovering state. This model works well for long-running business processes where the workflow itself is the unit of work.

But data systems often have different requirements. The unit of work isn't always the workflow. Sometimes it's a transformation that multiple workflows need. Sometimes it's a batch that shouldn't be processed twice regardless of which workflow triggered it. Sometimes completely independent pipelines should share results because they're computing the same thing.

Prefect takes a different approach: durability primitives that are decomposed from workflow identity. Results are addressable by cache key, not by which workflow produced them. Locks coordinate across any execution that computes the same key. Transactions group work into atomic units that can span different contexts.

This decomposition enables patterns that workflow-scoped durability can't express cleanly.

Results in Storage You Control

When a Prefect task completes, its return value goes to an object store that you own: S3, GCS, Azure Blob, or local filesystem. These are files you control, readable with other tools, usable as the foundation for data catalogs, accessible outside of Prefect entirely.

The location of each result is determined by a cache key. By default, the key includes the task's inputs, source code, and the current flow run ID. But cache policies are composable, so you can change what the key captures.

from prefect import flow, task
from prefect.cache_policies import INPUTS, TASK_SOURCE
 
# Default: results scoped to this flow run
@task(persist_result=True)
def scoped_to_run(x: int) -> int:
    return expensive_computation(x)
 
# INPUTS only: results shared across ANY flow that calls this with same input
@task(cache_policy=INPUTS, persist_result=True)
def shared_across_flows(x: int) -> int:
    return expensive_computation(x)
 
# INPUTS + TASK_SOURCE: shared across flows, invalidated if code changes
@task(cache_policy=INPUTS + TASK_SOURCE, persist_result=True)
def shared_with_code_versioning(x: int) -> int:
    return expensive_computation(x)

The second and third variants are where composability matters. These tasks produce results that aren't tied to any particular workflow execution. If Flow A calls shared_across_flows(42) on Monday, and Flow B calls shared_across_flows(42) on Tuesday, Flow B gets the cached result. They don't need to be parent and child. They don't need to know about each other. They just need to compute the same cache key.

The result lives in your S3 bucket, at a path derived from the computation itself. You could read it with boto3. You could build a feature store on top of it. You could query it from a notebook that has nothing to do with Prefect. The orchestrator doesn't own your data.

Exactly-Once Across Workflows

Addressable results solve the "resume after failure" problem, but distributed systems have another challenge: concurrent execution. What happens when multiple workers try to perform the same computation simultaneously? The interesting case goes beyond retries within a single workflow. Independent workflows might need the same work done at the same time.

Consider: you have a daily pipeline that processes customer data, and an ad-hoc analytics workflow that a data scientist triggers manually. Both need to compute the same expensive aggregation. Without coordination, they might both execute it, wasting compute and potentially producing inconsistent results if the underlying data changed between executions.

Prefect handles this through cache isolation levels and lock managers. You can configure SERIALIZABLE isolation with a distributed lock:

from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.transactions import IsolationLevel
from prefect_redis import RedisLockManager
 
cache_policy = (INPUTS + TASK_SOURCE).configure(
    isolation_level=IsolationLevel.SERIALIZABLE,
    lock_manager=RedisLockManager(host="redis.example.com"),
)
 
@task(cache_policy=cache_policy, persist_result=True)
def aggregate_customer_metrics(date: str) -> dict:
    # First caller acquires lock, executes, writes result
    # Any other caller (from ANY workflow) waits, then reads from cache
    return expensive_aggregation(date)

The lock manager coordinates access based on the cache key, not the workflow identity. If the daily pipeline and the ad-hoc workflow both call aggregate_customer_metrics("2024-01-15") at the same time, one acquires the lock and computes the result. The other waits, then reads the cached result. They could be running on different machines, triggered by different schedules, managed by different teams. The coordination happens at the computation level.

This is exactly-once semantics that span workflow boundaries. The lock manager handles distributed consensus; the cache provides the durable result. Prefect provides lock managers for different contexts: MemoryLockManager for threads, FileSystemLockManager for processes on one machine, and RedisLockManager for distributed execution.

Transactions and Sagas

Sometimes you need a group of operations to succeed or fail together. This is the classic saga pattern: a sequence of steps where failure at any point should trigger compensating actions for the steps that already completed.

Prefect supports transactional semantics with rollback hooks:

from prefect import flow, task
from prefect.transactions import transaction
 
@task
def reserve_inventory(order_id: str, items: list) -> str:
    reservation_id = inventory_service.reserve(items)
    return reservation_id
 
@reserve_inventory.on_rollback
def release_inventory(txn):
    inventory_service.release(txn.get("reservation_id"))
 
@task
def charge_payment(order_id: str, amount: float) -> str:
    charge_id = payment_service.charge(amount)
    return charge_id
 
@charge_payment.on_rollback
def refund_payment(txn):
    payment_service.refund(txn.get("charge_id"))
 
@task
def schedule_shipment(order_id: str, reservation_id: str) -> None:
    shipping_service.schedule(order_id, reservation_id)
 
@flow
def process_order(order_id: str, items: list, amount: float):
    with transaction() as txn:
        reservation_id = reserve_inventory(order_id, items)
        txn.set("reservation_id", reservation_id)
 
        charge_id = charge_payment(order_id, amount)
        txn.set("charge_id", charge_id)
 
        schedule_shipment(order_id, reservation_id)

If schedule_shipment fails, the rollback hooks fire in reverse order: payment gets refunded, inventory gets released. The saga's compensation logic lives right next to the tasks it compensates, not in a separate error handler or state machine.

Tasks within a transaction don't commit their cache records until the entire transaction succeeds. This means a re-run after failure will re-execute all tasks in the transaction, maintaining consistency. The transaction gives you atomic semantics; the rollback hooks give you saga-style compensation.

Code Separated from Compute

Traditional durable execution assumes static infrastructure. Workers run continuously, polling for tasks. When a job fails and retries, it runs on the same worker with the same resources.

Prefect decouples code from compute. Instead of deploying to static workers, you submit flows to infrastructure provisioned for that specific run. And you can choose that infrastructure dynamically based on runtime context:

from prefect import flow
from prefect.runtime import flow_run
from prefect_kubernetes import kubernetes
 
@flow
def process_data(dataset: str):
    return expensive_computation(dataset)
 
@flow
def resilient_pipeline(dataset: str):
    run_count = flow_run.run_count
 
    if run_count == 1:
        bound_flow = kubernetes(
            work_pool="standard-pool",
            job_variables={"memory": "4Gi"}
        )(process_data)
    else:
        bound_flow = kubernetes(
            work_pool="high-memory-pool",
            job_variables={"memory": "16Gi"}
        )(process_data)
 
    return bound_flow(dataset)

The retry doesn't hope for a different outcome on identical infrastructure. It provisions more memory because the failure suggested that's what's needed. Infrastructure becomes another runtime decision, not a deployment-time constraint.

Composable Primitives

The pieces described here are primitives that compose.

Cache policies are algebraic. INPUTS + TASK_SOURCE means "invalidate if inputs change OR code changes." INPUTS - "debug" means "ignore the debug parameter when computing the cache key." You build the caching semantics you need by combining simple pieces.

Isolation levels are configurable per-task. One task might use READ_COMMITTED (fast, allows concurrent execution) while another uses SERIALIZABLE (coordinated, exactly-once). You don't pick a global mode for your whole system.

Lock managers are pluggable. MemoryLockManager for local development, FileSystemLockManager for single-machine parallelism, RedisLockManager for distributed execution. Swap them based on your infrastructure without changing task code.

Result storage is separate from orchestration. Prefect tracks that a task ran and whether it succeeded. The actual result lives in your object storage, accessible to anything that can read from S3. The orchestrator coordinates; it doesn't own your data.

Infrastructure is a runtime choice. The same flow can run on different work pools, different resource allocations, even different cloud providers. Choose at deployment time or at runtime based on what the computation needs.

This composability means you can dial in exactly the durability semantics you need. A task that computes expensive features shared across many pipelines gets INPUTS + TASK_SOURCE with SERIALIZABLE isolation. A task that should always re-run gets NO_CACHE. A task that only makes sense within a single flow run uses the default policy with RUN_ID. Same primitives, different combinations, different behaviors.

Workflow-scoped durability systems take a different approach: the workflow execution is the unit of durability, and everything flows through the event history. That model works well for long-running business processes where the workflow itself is the valuable artifact. But when the valuable artifacts are the results, when different workflows should share computation, when you want your data in storage you control rather than locked in an orchestrator's database, when you need dynamic infrastructure that scales with your computation rather than static workers polling for small tasks, decomposed primitives give you more to work with.

Prefect builds durable execution from these primitives: addressable results in object storage, composable cache policies, pluggable lock managers, transactions with rollback hooks, and dynamic infrastructure that separates code from compute. The result is durability designed for data systems: big computations, expensive resources, results that matter beyond any single workflow.

Try Prefect Cloud for free, explore the open source package, or join our Slack community to learn more.