Prefect 0.14.0: Improved Flow Run Configuration
SHARE
Dec. 17, 2020

Improved Flow Run Configuration

Jim Crist-Harif
Jim Crist-HarifSenior Engineer

We’re excited to announce the release of Prefect 0.14.0! This release is the result of months of work to improve the flexibility and ease of configuring and deploying Prefect flows. In particular, 0.14.0 contains a new system for configuring where and how a flow run is deployed on users’ infrastructure.

For the full list of enhancements and features, check out the release changelog.

Prefect strives to be a one-stop-shop for writing and deploying workflows. Thanks to our Hybrid Model, all user code and data remains secure on their infrastructure, while still allowing workflows to be orchestrated by Prefect Cloud (or the self-hosted Prefect Server).

We do this through the use of Agents — lightweight processes that run inside a user’s infrastructure. An agent polls the Prefect backend for any new flow runs, and starts them on the platform of choice. Prefect ships with several different types of Agents for different deployment platforms (KubernetesDockerAWS ECS, …).

To allow users to customize a flow run, we originally added the concept of environmentsEnvironment objects let users customize the “job” started by the agent, adding flow specific configurations. For example, a user could customize the Kubernetes Job for a flow run using the KubernetesJobEnvironment:

from prefect import Flowfrom prefect.environments.execution

import KubernetesJobEnvironmentfrom prefect.environments.storage

import Dockerwith Flow("kubernetes-example") as flow:

# Add tasks to flow here...

# Run on Kubernetes using a custom job specification

# This was needed to do even simple things like increase

# the job resource limits

flow.environment = KubernetesJobEnvironment(job_spec_file="job_spec.yaml")

# Store the flow in a docker image

flow.storage = Docker()

Customizing flow runs using Environment objects worked well enough, but was not without problems:

  • They were too flexible. Since each Environment type could work with each agent type, it wasn’t always clear to users where and how to customize things. For example, if running on Kubernetes, do you customize the Job resources by changing a setting on the agent? Or through use of a KubernetesJobEnvironment (both could work in certain cases). Many users rightfully wanted one “right” way to do things, something the existing system couldn’t provide.

  • At the same time they weren’t flexible enough. Environment objects could contain custom user-code, and as such couldn’t be stored inside the Prefect Backend (due to our secure) — they had to be stored in the user’s infrastructure. This often led to contortions to maintain this separation. For example, if using a Kubernetes Agent to execute a flow using a KubernetesJobEnvironment there would be two jobs launched — one for the initial job to load the flow’s environment, and a second to launch the actual flow run job using the environment. This tripped up many users, both new and experienced.

  • They led to tight coupling. Prefect can optionally run on, as such there were a few Dask environments (e.g.DaskKubernetesEnvironment for deploying Dask on Kubernetes). This worked well when it handled user’s needs, but inevitably a user would ask for a Dask option that we didn’t expose, forcing us to mirror each Dask option in our environment own classes. Further, Dask supports. Creating an Environment class for each platform would lead to an explosion of Dask environments, something we don’t have the bandwidth to maintain.

To support both the flexibility and ease of use our users wanted, we clearly needed to rethink this design.

Prefect 0.14.0 is the result of this redesign. As a result of this redesign, environments have been deprecated and split into two different options:

  • RunConfigs: describe where and how a flow run should be executed.

  • Executors: describe where and how tasks

in a flow run should be executed.

These options can be mixed and matched, allowing for greater flexibility in choosing where and how a flow run is deployed.

RunConfig objects define where and how a flow run should be executed. Each RunConfig type has a corresponding Prefect Agent (i.e. LocalRun pairs with a Local Agent, DockerRun pairs with a Docker Agent, ...). The options available on a RunConfig depend on the type, but generally include options for setting environment variables, configuring resources (CPU/memory), or selecting a docker image to use (if not using Docker storage).

For example, to configure a flow to run on Kubernetes:

from prefect import Flow

from prefect.run_configs import KubernetesRun

from prefect.storage import Docker with Flow("kubernetes-example") as flow:

# Add tasks to flow here...

# Run on Kubernetes with a custom resource configuration

flow.run_config = KubernetesRun(cpu_request=2, memory_request="4Gi")

# Store the flow in a docker image

flow.storage = Docker()

Unlike environments, run-configs are lightweight JSON-able objects that contain no user-defined code. This means they’re safe to store in the Prefect Backend and load inside the Agents, removing the need for any intermediate jobs and allowing for greater flexibility.

It also means we can display useful information about them in the UI!

ECSRun in the Prefect Cloud UI

undefined

Executors are responsible for executing tasks in a flow run. There are several different options, each with different performance characteristics. Choosing a good executor configuration can greatly improve your flow's performance.

All Dask-related configuration has been moved to the DaskExecutor. Through the use of cluster_class and cluster_kwargs, users can configure and use any of the existing Dask deployment libraries, without requiring any special integrations with Prefect.

For example, to configure a flow to use a DaskExecutor connected to a Dask cluster running on AWS Fargate (managed by dask-cloudprovider):

from prefect import Flow

from prefect.executors import DaskExecutorwith

Flow("daskcloudprovider-example") as flow:

# Add tasks to flow here...

# Execute this flow on a Dask cluster deployed on AWS Fargate

flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})

This separation of run-configs and executors lets users mix-and-match, allowing for all sorts of new deployment patterns.

For example, to deploy a flow

  • Stored on AWS S3

  • Using a Dask cluster on AWS Fargate

  • With the initial flow-runner process run locally by Local Agent

you’d use S3 storage, a LocalRun run-config, and a DaskExecutor configured to use dask-cloudprovider:

from prefect import Flow

from prefect.storage import S3

from prefect.run_configs import LocalRun

from prefect.executors import DaskExecutor

with Flow("example") as flow:

...

flow.storage = S3("my-flows")

flow.run_config = LocalRun()

flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})

Switching to using the ECS Agent (so the intial flow-runner process also runs on AWS ECS) only requires swapping out LocalRun for ECSRun:

from prefect import Flow

from prefect.storage import S3

from prefect.run_configs import ECSRun

from prefect.executors import DaskExecutor

with Flow("example") as flow:

...

flow.storage = S3("my-flows")flow.run_config = ECSRun()

# Run job on ECS instead of locally

flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})

If you then wanted to drop distributed execution and move to running a local dask cluster in a larger container you’d swap out the executor for a LocalDaskExecutor, and configure cpu and memory on the ECSRun run-config.

from prefect import Flow

from prefect.storage import S3

from prefect.run_configs import ECSRun

from prefect.executors import LocalDaskExecutor

with Flow("example") as flow:

...

flow.storage = S3("my-flows")flow.run_config = ECSRun(cpu="2 vcpu", memory="4 GB") # increase resource limits

flow.executor = LocalDaskExecutor(num_workers=8) # Use a local dask cluster

This separation of concerns makes common patterns more flexible and composable, and opens up all sorts of possibilities. See the RunConfig and Executor documentation for information on available options for each platform, and examples of use.

The new configuration design opens up lots of possibilities for future improvements. Some on the roadmap already:

Note that while environments are deprecated, they won’t be removed for a long time. We care a lot about maintaining backwards compatibility — your existing flows should continue to run fine while you figure out a transition plan. We’ve written up some tips on upgrading here. While you’re at it, you might also want to read through our recently revamped deployment tutorial.

There were plenty of other fixes and enhancements added in this release (many by our excellent community members), for a full list see the 0.14.0 release notes.

We are excited to keep improving the open source platform based on the community’s feedback, and can’t wait to see what you build next!

Please continue reaching out to us with your questions and feedback — we appreciate the opportunity to work with all of you!

Happy Engineering!

— The Prefect Team

Posted on Dec 17, 2020
Blog Post
Docker
Kubernetes
Dask
AWS ECS
Caching
Dynamic DAGs
Debugging
Error Handling
Logging
Mapping
Notifications
Retries
Scheduling
Integrations
Monitoring
DevOps & CI/CD

Love your workflows again

Orchestrate your stack to gain confidence in your data