Prefect 0.14.0: Improved Flow Run Configuration
blog

Prefect 0.14.0: Improved Flow Run Configuration

We’re excited to announce the release of Prefect 0.14.0

Jim Crist-Harif | Senior Engineer

December 17, 2020

We’re excited to announce the release of Prefect 0.14.0! This release is the result of months of work to improve the flexibility and ease of configuring and deploying Prefect flows. In particular, 0.14.0 contains a new system for configuring where and how a flow run is deployed on users’ infrastructure.
For the full list of enhancements and features, check out the release changelog.
Prefect strives to be a one-stop-shop for writing and deploying workflows. Thanks to our Hybrid Model, all user code and data remains secure on their infrastructure, while still allowing workflows to be orchestrated by Prefect Cloud (or the self-hosted Prefect Server).
We do this through the use of Agents — lightweight processes that run inside a user’s infrastructure. An agent polls the Prefect backend for any new flow runs, and starts them on the platform of choice. Prefect ships with several different types of Agents for different deployment platforms (KubernetesDockerAWS ECS, …).
To allow users to customize a flow run, we originally added the concept of environmentsEnvironment objects let users customize the “job” started by the agent, adding flow specific configurations. For example, a user could customize the Kubernetes Job for a flow run using the KubernetesJobEnvironment:
from prefect import Flowfrom prefect.environments.execution
import KubernetesJobEnvironmentfrom prefect.environments.storage
import Dockerwith Flow("kubernetes-example") as flow:
# Add tasks to flow here...
# Run on Kubernetes using a custom job specification
# This was needed to do even simple things like increase
# the job resource limits
flow.environment = KubernetesJobEnvironment(job_spec_file="job_spec.yaml")
# Store the flow in a docker image
flow.storage = Docker()
Environmental Degradation
Customizing flow runs using Environment objects worked well enough, but was not without problems:
  • They were too flexible. Since each Environment type could work with each agent type, it wasn’t always clear to users where and how to customize things. For example, if running on Kubernetes, do you customize the Job resources by changing a setting on the agent? Or through use of a KubernetesJobEnvironment (both could work in certain cases). Many users rightfully wanted one “right” way to do things, something the existing system couldn’t provide.
  • At the same time they weren’t flexible enough. Environment objects could contain custom user-code, and as such couldn’t be stored inside the Prefect Backend (due to our secure) — they had to be stored in the user’s infrastructure. This often led to contortions to maintain this separation. For example, if using a Kubernetes Agent to execute a flow using a KubernetesJobEnvironment there would be two jobs launched — one for the initial job to load the flow’s environment, and a second to launch the actual flow run job using the environment. This tripped up many users, both new and experienced.
  • They led to tight coupling. Prefect can optionally run on, as such there were a few Dask environments (e.g.DaskKubernetesEnvironment for deploying Dask on Kubernetes). This worked well when it handled user’s needs, but inevitably a user would ask for a Dask option that we didn’t expose, forcing us to mirror each Dask option in our environment own classes. Further, Dask supports. Creating an Environment class for each platform would lead to an explosion of Dask environments, something we don’t have the bandwidth to maintain.
To support both the flexibility and ease of use our users wanted, we clearly needed to rethink this design.
Flow Configuration 2.0
Prefect 0.14.0 is the result of this redesign. As a result of this redesign, environments have been deprecated and split into two different options:
  • RunConfigs: describe where and how a flow run should be executed.
  • Executors: describe where and how tasks
in a flow run should be executed.
These options can be mixed and matched, allowing for greater flexibility in choosing where and how a flow run is deployed.
RunConfigs
RunConfig objects define where and how a flow run should be executed. Each RunConfig type has a corresponding Prefect Agent (i.e. LocalRun pairs with a Local Agent, DockerRun pairs with a Docker Agent, ...). The options available on a RunConfig depend on the type, but generally include options for setting environment variables, configuring resources (CPU/memory), or selecting a docker image to use (if not using Docker storage).
For example, to configure a flow to run on Kubernetes:
from prefect import Flow
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker with Flow("kubernetes-example") as flow:
# Add tasks to flow here...
# Run on Kubernetes with a custom resource configuration
flow.run_config = KubernetesRun(cpu_request=2, memory_request="4Gi")
# Store the flow in a docker image
flow.storage = Docker()
Unlike environments, run-configs are lightweight JSON-able objects that contain no user-defined code. This means they’re safe to store in the Prefect Backend and load inside the Agents, removing the need for any intermediate jobs and allowing for greater flexibility.
It also means we can display useful information about them in the UI!
undefined
Executors
Executors are responsible for executing tasks in a flow run. There are several different options, each with different performance characteristics. Choosing a good executor configuration can greatly improve your flow's performance.
All Dask-related configuration has been moved to the DaskExecutor. Through the use of cluster_class and cluster_kwargs, users can configure and use any of the existing Dask deployment libraries, without requiring any special integrations with Prefect.
For example, to configure a flow to use a DaskExecutor connected to a Dask cluster running on AWS Fargate (managed by dask-cloudprovider):
from prefect import Flow
from prefect.executors import DaskExecutorwith
Flow("daskcloudprovider-example") as flow:
# Add tasks to flow here...
# Execute this flow on a Dask cluster deployed on AWS Fargate
flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})
Examples
This separation of run-configs and executors lets users mix-and-match, allowing for all sorts of new deployment patterns.
For example, to deploy a flow
  • Stored on AWS S3
  • Using a Dask cluster on AWS Fargate
  • With the initial flow-runner process run locally by Local Agent
you’d use S3 storage, a LocalRun run-config, and a DaskExecutor configured to use dask-cloudprovider:
from prefect import Flow
from prefect.storage import S3
from prefect.run_configs import LocalRun
from prefect.executors import DaskExecutor
with Flow("example") as flow:
...
flow.storage = S3("my-flows")
flow.run_config = LocalRun()
flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})
Switching to using the ECS Agent (so the intial flow-runner process also runs on AWS ECS) only requires swapping out LocalRun for ECSRun:
from prefect import Flow
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect.executors import DaskExecutor
with Flow("example") as flow:
...
flow.storage = S3("my-flows")flow.run_config = ECSRun()
# Run job on ECS instead of locally
flow.executor = DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5})
If you then wanted to drop distributed execution and move to running a local dask cluster in a larger container you’d swap out the executor for a LocalDaskExecutor, and configure cpu and memory on the ECSRun run-config.
from prefect import Flow
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect.executors import LocalDaskExecutor
with Flow("example") as flow:
...
flow.storage = S3("my-flows")flow.run_config = ECSRun(cpu="2 vcpu", memory="4 GB") # increase resource limits
flow.executor = LocalDaskExecutor(num_workers=8) # Use a local dask cluster
This separation of concerns makes common patterns more flexible and composable, and opens up all sorts of possibilities. See the RunConfig and Executor documentation for information on available options for each platform, and examples of use.
Future Developments
The new configuration design opens up lots of possibilities for future improvements. Some on the roadmap already:
Note that while environments are deprecated, they won’t be removed for a long time. We care a lot about maintaining backwards compatibility — your existing flows should continue to run fine while you figure out a transition plan. We’ve written up some tips on upgrading here. While you’re at it, you might also want to read through our recently revamped deployment tutorial.
There were plenty of other fixes and enhancements added in this release (many by our excellent community members), for a full list see the 0.14.0 release notes.
We are excited to keep improving the open source platform based on the community’s feedback, and can’t wait to see what you build next!
Please continue reaching out to us with your questions and feedback — we appreciate the opportunity to work with all of you!
Happy Engineering!
— The Prefect Team