How to Make Your Data Pipelines More Dynamic Using Parameters in Prefect
SHARE
Jan. 13, 2022

How to pass runtime-specific parameter values to your data pipelines

Anna Geller
Anna GellerLead Community Engineer

Parametrization is one of the most critical features of any modern workflow orchestration solution. It allows you to dynamically overwrite parameter values for a given run without having to redeploy your workflow. Most orchestration frameworks provide rather limited functionality in that regard, such as only allowing to override global variables. Prefect, however, provides a first-class abstraction for handling dynamic parametrized workflows. Let’s look at it in more detail.

Prefect’s Parameter task is a simple abstraction to run dynamic workflows that adjust their behavior based on the parameter value. To use it, you need to specify your Parameter task within the Flow constructor:

Prefect provides a great amount of flexibility with respect to how you want to trigger your flow. Let’s look at six different ways to start a parametrized workflow.

When running your flow locally, you can pass your parameter values to the parameters keyword in the flow.run():

The above code will trigger three flow runs, each with a different parameter value:

If you prefer running your flows from CLI, here is how you can trigger a flow run with custom parameter values:

prefect run -p parametrized_flow.py --param x=3

The output of this:

1 jtxa5EF-wAJykedh4oM94w

parametrized local flow run from a Prefect CLI

You can see that when using the Prefect CLI, you don’t need to use sys.argv , argparseclick or similar command-line interface libraries.

Let’s say that you want to trigger your flow from a terminal, but you want to run it on your remote agent (e.g. in a Kubernetes cluster). To do that, you need to register your flow:

prefect register --project community -p parametrized_flow.py

1 Xq7pBGxBaMlBdr-DThYfEw

register your flow

To trigger a remote flow run that will be picked up by your corresponding agent, we can use the CLI by using the flow name:

prefect run --name parametrized_flow --param x=42 --watch

1 ZjT8YtuiSIEPcRPhoMYrcg

To trigger a remote flow run that will be picked up by your corresponding agent, we can use the CLI by using the flow name

But what if you need to start a flow run from a serverless function or some other programming language? You can leverage the GraphQL API. Here is how we can use the same parametrized flow in an API call:

The easiest and most accessible way of triggering a parametrized flow run is the Prefect UI. To start your flow, navigate to the “Run” tab of the respective flow page:

1 52i8VtECWyxRKNb0cLlOFg

The Prefect UI showing a parametrized flow with the "run" button highlighted

From here, you can enter your desired parameter values and click on “Run.”

1 XWpvluhig70TMUegwe9faQ

Prefect UI showing where to enter desired parameter values and click on "run"

Prefect is the only open-source workflow orchestration platform that provides first-class support for the parent-child workflow orchestration pattern. Here is how you can start a parametrized child flow run from a parent flow:

The create_flow_run task allows specifying which flow do we want to trigger, and what parameter values should we use for this flow run. The additional task wait_for_flow_run ensures that this parent flow is only considered successful if the child flow ran without any issues. Thanks to the stream_logs argument, we can see the child flow run logs directly from the parent flow.

We can use the CLI to register and run the parent flow:

Regardless of whether your ad-hoc flow runs are executed locally or on remote infrastructure, Prefect allows you to trigger your parametrized workflows via:

  • a Python client,

  • a command-line interface (CLI),

  • an API call (e.g. using the requests library or even plain curl),

  • Prefect UI,

  • other (parent) flows.

No matter which of the above methods you use, with Parameter tasks you immediately gain the advantage of dynamic runtime-parametrization of your workflows.

So far, we’ve looked at how to overwrite default parameter values at runtime when triggering flows ad-hoc. Let’s explore how to attach parameter values to your schedules. The code block below demonstrates how you can run two workflows simultaneously every minute, each with different default parameter values:

This flow will trigger two flow runs every minute — one with parameter value 9, and another one with 99.

1 mO8kFarsHUTBLwPhZ6BHvg

Prefect UI showing scheduled flow runs

The UI allows you to inspect which parameter values have been used for each run:

1 xnnbr6HPzNLtmSQPGdt-DQ

The Prefect UI allows you to inspect which parameter values have been used for each run

When you schedule flow runs, parameter defaults are particularly important. Without providing default values either on your Parameter task or on the clock, Prefect will not know which value to use and won’t be able to run such flow on schedule. Here are two ways of setting parameter defaults:

Sometimes you may need to access parameter values within a state handler. For instance, when alerting about a failed task run, you may want to include the parameter value within a Slack message. To do that, you can leverage the Prefect context:

prefect.context.parameters.get("parameter_name")

The example below shows how to use a state handler to rename a flow run based on the provided parameter value:

We can register this flow and run it twice: once with a default, and once with a custom parameter value:

By following the flow run URL printed in the CLI output, you should see that both flow runs have been renamed as intended:

1 4fo3I2CgMyHw2UIoR Bfzg

Prefect UI showing the CLI output; both flow runs have been renamed as intended

While parameters provide an extremely convenient abstraction to make your workflows more dynamic, the default parameter values themselves are static and can be overridden at runtime. This means that Prefect evaluates default parameter values at registration time and stores those as “frozen” values in the backend. There are several implications of this:

  1. Don’t use dynamic dates such as datetime.datetime.today() as a default value. Since Prefect evaluates those at registration time, your data pipeline would get stuck reliving the same day, much like Phil Connors in the movie Groundhog Day.

  2. Don’t store any sensitive data as default parameter values unless you are fine that this data will be stored in a Prefect Cloud backend. All parameter values are serialized and persisted using PrefectResult.

  3. Don’t use empty or large values such as large text or JSON documents as parameter values since the backend API allows a payload of up to 5 MB. Large objects can be stored in a resource like an S3 bucket, and you could use the Parameter default to point to its location.

Some Prefect users try to use parameters for backfilling workflows. This is a common anti-pattern (don’t do this):

This approach will backfire as long as the default values are used.

To mitigate the issue of default parameter values being frozen at runtime, you can use a separate task to return a dynamic default date. Then, if the parameter value is not None, your flow will use a custom value provided at runtime rather than the default date generated in your task. Here is an example that illustrates this approach (do that instead):

Note that default parameter values are set to None. The default values are therefore generated within the tasks and custom parameter overrides are only used when those are set explicitly when running the flow from CLI, UI, API, or from another flow.

Here is how we can start this parametrized flow using CLI:

💡 You don’t have to use Parameters! The easiest way of implementing backfilling flows is not by using Parameter tasks but rather by leveraging the KV Store. The documentation provides an example of how to go about it.

Apart from handling the parameter values correctly, make sure to either use the Parameter value in your downstream tasks or add it to the flow explicitly. Here is a common anti-pattern to watch out for:

When you trigger this flow, it will generate a ValueError:

ValueError: Flow.run received the following unexpected parameters: dummy

The error happens because the dummy_parameter is not used anywhere in the flow, nor is it added to the flow structure manually. Therefore, Prefect does not recognize dummy as a valid parameter. The first solution is to pass the parameter value to some downstream task as a data dependency:

Alternatively, you can explicitly add the Parameter task to your flow as follows:

The only rule regarding parameter values is that they must be JSON serializable. This means that you can’t use Python objects such as a numpy array, but you can use all JSON serializable Python data types such as strings, integers, lists, or dictionaries. For instance, the flow run from the last section had None as a default value which looks as follows in the UI:

1 vD5E2S9RRGR92p ku7zpyA

screen grab of the Prefect UI illustrating a flow run with none as a default value

But if you want to set a custom value for an ad-hoc backfilling flow run, you could leverage the date picker built into the UI, which prevents you from accidentally entering wrong values such as invalid dates:

1 ApfLCXyP7ohZlw3YOFfEdw

date picker built in to Prefect UI

You are free to choose any name for your parameter, as long as there is only one parameter with that name in the flow. All Prefect tasks have a “slug,” which is a name that uniquely identifies a task in a flow, including the order in which this task appears in a flow. For each Parameter task, the slug is automatically and immutably set to the parameter name, which ensures that the flow has no other parameters with the same name. This is the only “gotcha” when naming your parameters.

In this post, we took a deep dive into parametrized workflows using Prefect. If you don’t want to miss any next Prefect posts, sign up for our newsletter. Also, if anything about what we’ve discussed in this post is unclear, feel free to ask your questions in our community Slack.

We can’t wait to see what you build. Thanks for reading, and happy engineering!

Posted on Jan 13, 2022
Blog Post
CLI
Dynamic DAGs

Love your workflows again

Orchestrate your stack to gain confidence in your data