Prefect 0.15.0: A New Flow Run Experience
blog

Prefect 0.15.0: A New Flow Run Experience

An overhauled flow run CLI, sub-flow result passing, agentless execution, improved run observability, new auth patterns, and more!

Michael Adkins | Software Engineer

July 1, 2021

We’re excited to announce the release of Prefect 0.15.0! This release is the result of months of work to improve the interface for creating and inspecting flow runs.
For the full list of enhancements and features, check out the release changelog.

The core of Prefect is orchestrating the execution of your workflows. When using Prefect Cloud or Server, you indicate a workflow should be run by creating a "flow run". A flow run can be created from the UI, via a GraphQL call, from a task in another flow run, or using the CLI. Continuous improvements have been made to launching flow runs from the the web UI; log levels can be easily adjusted, configuration can be modified per run, and JSON flow parameters have syntax highlighting. With the UI, we can redesign something easily because you're not relying on the interface programmatically. With the Prefect Python library, we can break your code, so we're careful to limit our redesigns until there's strong evidence we are correct. In 0.15.0, we've added:
  • prefect run — a new CLI for running flows
  • create_flow_run and get_task_run_result tasks to create subflows and retrieve data from them
  • FlowRunView, TaskRunView, FlowView, and TenantView objects to pull data from the backend without writing GraphQL
  • simpler authentication using API keys
...and more, but there's only so much we can fit in a blog post!
A new CLI for running flows
We created a new CLI for running flows: prefect run.
prefect run is designed to make the transition from local execution to running with a Cloud or Server backend seamless. Previously, when developing, you needed to define your flow then call flow.run() at the bottom; then, to test the flow, you'd run python my_flow_file.py; then, when the flow is ready to be pushed to Cloud, you'd change your flow.run() call to flow.register("project") and go to the UI to start a flow run. With prefect register (introduced in 0.14.13) and prefect run, you can drop this cruft from your flow. Limiting your flow files to the definition of your flow and using the CLI to perform actions on that definition means you don't have to edit your flow to change how it is run.
prefect run will take an import or file path to run a flow locally; this replaces using flow.run(). For example, the following will run the file, extract the flow object, and call flow.run() for you:
$ prefect run -p my_flow_file.py
To transition this flow from local to Cloud, we can register it with an interface that feels the same; this replaces flow.register():
$ prefect register -p my_flow_file.py --project example
Before we continue, let's switch to a real flow. We've added a "Hello World" flow to Prefect to make getting started really easy.
First, try running the flow locally:
$ prefect run -m prefect.hello_world
undefined
Then, register the flow:
(you may need to create a project first with prefect create project 'example')
$ prefect register -m prefect.hello_world --project example
undefined
Now, a flow run can be created using the prefect run CLI:
$ prefect run --name "hello-world" --watch
undefined
What's with the --watch flag? When used with a backend, prefect run will typically exit immediately after creating a flow run since the execution of the flow run is managed by an agent. Above, we opted to watch the flow run which will stream logs and state changes from the flow run to your machine. This provides some instant feedback about how the flow run is going. Of course, we display a link to monitor the flow run from the UI and you can see all of the logs there as well.
What about the old command? prefect run flow will stick around for compatibility but is now deprecated. Creating a new command lets us change behavior and arguments without worrying about backwards compatibility.
Agent warnings
A common question is: Why is my flow run stuck in a 'Scheduled' state; why hasn't it started?
When prefect run --watch is used, we can help answer this question. If the flow run does not enter a 'Running' state 15 seconds after being created, we will investigate. Flow runs are matched with agents by "labels". The labels on an agent must include all of the labels on the flow run for the agent to pick up the flow run. We pull all of your agents from the backend and compare your labels to the flow run's labels to determine if there are any matching agents. If not, we'll tell you to start an agent with the required set of labels. If there is an agent with matching labels, we'll check the last time the agent contacted the backend to see if it's healthy and warn you if it is not. Combined with the new agent screens in the UI, we're improving communication where things can go wrong so you can quickly identify a solution and focus on your engineering tasks at hand.
Agentless execution
In 0.15.0, we have introduced a new concept of "agentless flow run execution". Using the prefect run CLI, you can execute a flow directly with the rich set of features enabled by communicating with the backend API. For example, you'll be able to see your flow run from the UI, your Cloud concurrency limits will be respected, and you'll have a context populated with real object ids. At the same time, you'll be able to set breakpoints in your tasks and debug your flow locally.
Using our hello-world flow from above, pass the --execute flag to prefect run to execute the flow run without an agent.
$ prefect run --name "hello-world" --execute
This works by:
  • Creating a flow run for you in the backend with an extra unique label so an agent does not pick up the flow run
  • Creating a subprocess including environment variables from your RunConfig (other infrastructure settings are ignored)
  • The subprocess pulls the flow from storage
  • The flow is executed with reporting to the backend
Agents add a layer to Prefect in which we help you manage the infrastructure your flow is going to be deployed on. Since the agent is removed from the picture here, you are taking full ownership of the infrastructure your flow runs on. If you want the flow to run in a container, you must set up the container yourself then call this command. This gives deep control over the execution environment and scheduling of flow runs to the users that need it
Sub-flow result passing
Since we introduced the ability to launch flows from a flow, users have been asking for a first-class way to pass task results between flows. In 0.15.0, we're releasing new tasks to create sub-flows and pull results from them with a very simple interface.
The existing task to create sub-flows, StartFlowRun, provides a wait flag which changes its return value from a flow run id to a state—this makes it hard to compose with other tasks. A much simpler create_flow_run task was introduced to create a child flow run. This task always returns a flow run id immediately after creation. We then added a get_task_run_result(flow_run_id: str, task_slug: str) task which will retrieve the result from the task in the given flow run.
For example, we can pull a result from a child flow into a parent flow then act on that data:
from typing import List

from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.tasks.prefect import create_flow_run, get_task_run_result

@task(result=LocalResult())
def create_some_data(length: int):
    return list(range(length))

with Flow("child") as child_flow:
    data_size = Parameter("data_size", default=5)
    data = create_some_data(data_size)

@task(log_stdout=True)
def transform_and_show(data: List[int]) -> List[int]:
    print(f"Got: {data!r}")
    new_data = [x + 1 for x in data]
    print(f"Created: {new_data!r}")
    return new_data

with Flow("parent") as parent_flow:
    child_run_id = create_flow_run(
        flow_name=child_flow.name, parameters=dict(data_size=10)
    )
    child_data = get_task_run_result(child_run_id, "create_some_data-1")
    transform_and_show(child_data)
To run this example, save it to a file parent-child-flow.py. Then we can register and run the parent
$ prefect register -p parent-child-flow.py --project example
$ prefect run --name "parent" --execute
undefined
Inspection of flow runs
The UI provides a wonderful view into your flow runs with minimal effort, we want to bring our Python library in-line with this ease. 0.15.0 creates a prefect.backend module with Python objects that provide views of the backend without requiring you to write any GraphQL queries. GraphQL is a powerful view into the Prefect backend, allowing you to retrieve exactly the data you want from nested objects, but often you just want to pull a typical set of data for a single object. Since flow runs are at the center of what Prefect does, we've focused on making it simple to pull useful data for a flow run.
A FlowRunView lets you pull the most recent information about a flow run from the backend. For example:
from prefect.backend import FlowRunView

# Create a new instance using an ID from the UI or from the
# output of the `prefect run` command
flow_run = FlowRunView.from_flow_run_id(
    "4c0101af-c6bb-4b96-8661-63a5bbfb5596"
)

# You now have access to information about the flow run
flow_run.state       # 
flow_run.labels      # {"foo"}
flow_run.parameters  # {"name": "Marvin"}
flow_run.run_config  # LocalRun(...)
flow_run.states      # [Scheduled(), Submitted(), Running(), Success()]

# You can also retrieve information about the flow
flow = flow_run.get_flow_metadata()  # FlowView(...)

# Or about a task run
task_run = flow_run.get_task_run(task_slug='say_hello-1')  # TaskRunView()
Notice that the FlowRunView provides methods to access two related structures: FlowView and TaskRunView. FlowView exposes the metadata that we store about a flow when it is registered. TaskRunView exposes metadata for a single run of a task in your flow.
A FlowView can also be directly instantiated with various lookup methods:
from prefect.backend import FlowView

flow_view = FlowView.from_flow_name(flow_name="hello-world")

# If a name is not unique, a project will need to be provided
flow_view = FlowView.from_flow_name(
    flow_name="hello-world", project_name="example"
)

# Or a lookup can be performed by id
flow_view = FlowView.from_flow_id("8a896c14-07ac-4538-bed1-162e188d780f")

# The view provides information about the flow
flow_view.run_config  # LocalRun(...)
flow_view.storage     # Module(...)
flow.core_version     # 0.15.0
flow.project_name     # "example"
A TaskRunView can also be instantiated directly, but it requires a flow run id so we recommend using FlowRunView().get_task_run(...):
from prefect import task
from prefect.backend import TaskRunView

# Presume we have a flow with the following task
@task
def foo():
  return "foobar!"

# Instantiate a task run directly
task_run = TaskRunView.from_task_slug("foo-1", flow_run_id="")

# As with the other views, we can inspect the task run
task_run.state      # Success(...)
task_run.map_index  # -1  (not mapped)

# We can also retrieve results
task_run.get_result()  # "foobar!"
The most powerful feature of the TaskRunView is its ability to pull results from their location on your infrastructure. This object does the heavy lifting of looking up the location of the result and pulling the data from it. It also handles mapped tasks which require the result to be pulled from many locations. Check out the task run view documentation for more details.
These backend objects were designed on conjunction with all of the features described above. We've leveraged their simple interface to build new, powerful features with minimal effort. Without these abstractions, the code for our new features would have a lot of repetition and complexity. We're excited to see what you can build with them as well
API keys for simple authentication
As Prefect has grown, we've discovered that user and enterprise requirements for authentication were not met by our API tokens. Behind the scenes, months of work was done to move us to a new form of authentication, API keys, that can meet the granular permissioning desires of our customers. We took this opportunity to rehaul the feel of authentication in the Prefect Python library.
An API key is tied directly to an account and has all of the permissions of that account. It can be used for both registering and running flows so you don't need to create one token to log in with and another to run your agents. As always, we don't want to break existing workflows so this CLI is fully backwards compatible and familiar looking, but it simplifies our authentication story letting you quickly get to running your workflows.
Getting started with an API key
If you've never used a token, getting going with an API key is simple. Head to the UI API key page and create a new key. Then open up a terminal and run prefect auth login --key "<YOUR-KEY>"
Congrats! You're logged in. Your key has now been saved to ~/.prefect/auth.toml and will be used for all future operations. Try starting an agent with prefect agent local start.
If your user belongs to multiple tenants, you can view them with prefect auth list-tenants and switch tenants with prefect auth switch-tenants. When you switch tenants, the auth.toml file will be updated so all future Cloud interactions use the new tenant.
If you want to remove your key from your machine, run prefect auth logout
Transitioning from a token to an API key
If you're using a token right now, we encourage you to switch over!
First, create a new key:
$ prefect auth create-key --name "<something-familiar>"
Then, run prefect auth logout to log out from your current tenant then run prefect auth logout again to remove your API token from your machine entirely. This take two runs so that the old behavior of the prefect auth logout command is retained while using a token.
You may get some warnings that your API token has been set elsewhere when you try using the CLI to logout. This means you've set your auth token in the config.toml at prefect.cloud.auth_token or in the env var PREFECT__CLOUD__AUTH_TOKEN. Just remove that config setting and you're logged out.
Since API keys can be used to run agents, you can also remove prefect.cloud.agent.auth_token or PREFECT__CLOUD__AGENT__AUTH_TOKEN. You won't be needing them anymore.
Now that you've purged your token, you can login with the API key you generated as described above.
Service accounts
When running agents in production or registering flows in CI, it doesn't make sense to tie API interactions to your specific user account. For this, we've introduced service accounts which are scoped to a single tenant. You can create service accounts and an associated API key from the UI.
Then, in your CI job you can set an environment variable PREFECT__CLOUD__API_KEY="<API-KEY>" or use the login CLI as described above
In conclusion
We've rehauled the Prefect Core Python API for running and inspecting flow runs and exposed some powerful new patterns. In the process, we rewrote most of the flow run documentation. We've also simplified authentication while providing more powerful access control. We can't fit everything in a blog post, so be sure to check out the release changelog to see everything else.

We're excited to see what you can do with these new features and we're always looking for more feedback so we can continue to make the best orchestration tool around!
Please continue reaching out to us — we appreciate the opportunity to work with all of you!
P.S. we're hiring! We are currently seeking enthusiastic new colleagues for a variety of engineering, marketing, operations, product, and sales roles. Open positions are always posted on our website and we have referral incentives for anyone in our community that introduces us to someone extraordinary!
Happy Engineering!
— The Prefect Team