Prefect Product

Unveiling Interactive Workflows

We've introduced the ability to interact with workflows at runtime.

January 30, 2024
Andrew Brookins
Lead Staff Engineer
Chris Pickett
Staff Engineer
Share

Have you ever wanted to pause a workflow to get human input? With Prefect’s new interactive workflows, you can describe the data your flow needs, and Prefect will render a form in its UI for users to enter the data. But this capability goes beyond human input—interactive workflows can communicate with each other, too.

It’s now easier than ever to write flows that pause for human-in-the-loop input, manage continuous data streams, and more. Curious about how this works in practice? Read on for a detailed example that explains how to add interactivity to your flows.

💡 Want to jump right into code? Check out our example repository for everything used in this blog post or read our guide to creating interactive workflows.

Pausing (or Suspending) to Get Input

One way to build an interactive workflow is to call either the pause_flow_run or suspend_flow_run functions in your flow and describe the data you want to receive.

💡 Pausing a flow keeps your flow running while it waits for input. Suspending a flow causes the flow to exit, allowing infrastructure to deprovision. When Prefect receives input for your suspended flow, it runs your flow again. See our documentation for more details on pausing and suspending flows.

These functions will stop executing your flow to wait for input and then resume when Prefect receives the input. The return value of calling either function will be an instance of the data you described.

You can specify the type of data you want as input in a few ways:

  • A type annotation describing a built-in type, such as str, int, or Enum
  • A Pydantic BaseModel
  • A Prefect RunInput (a subclass of pydantic.BaseModel)

💡 prefect.input.RunInput is a lightweight wrapper around Pydantic’s BaseModel. This means that you can use the same notation for fields and validation rules in your RunInput subclass as you would when creating a Pydantic model.

Here is an example Enum that we’ll use later to ask a user if an image is a dog or a pig:

1class Animal(Enum):
2    """
3    The possible labels that the model can return
4    """
5    PIG = "pig"
6    DOG = "dog"

Because we only want to receive one value as input—the label—we can stop here. If we wanted multiple values, we could use a BaseModel or a RunInput like this:

1from prefect.input import RunInput
2
3
4class Animal(Enum):
5    """
6    The possible labels that the model can return
7    """
8    PIG = "pig"
9    DOG = "dog"
10
11
12class ImageInput(RunInput):
13    label: Animal
14    comment: str

ImageInput is a subclass of RunInput that has a single field called label of type Animal. RunInput is a subclass of pydantic.BaseModel, so you can use all the Pydantic features that BaseModel provides, like validation and default values.

💡 RunInput classes can contain nested RunInput classes to describe complex data structures. This is also true of BaseModel classes.

To keep things simple, we’ll use the Animal enum to define our input, instead of ImageInput. Now we can build a human-in-the-loop flow that pauses to wait for this input. To accomplish this, we’ll call the pause_flow_run function in our flow and pass in our Animal class.

Here’s the basic idea, which we’ll expand on later in this post:

1@flow
2async def classify_image(image: bytes):
3		# Imagine that by this point, we've notified the user that we
4    # need them to label an image. Now, we pause the flow to wait
5	  # for their label.
6	  image_label = await pause_flow_run(wait_for_input=Animal)

This flow only does one thing: pausing to wait for an Animal input. Let’s expand on the example. We’re going to write a flow that asks a model whether an image is a pig or a dog. If the model isn’t confident enough, the flow will pause and wait for a human to label the data.

Example: An Image Classifier

The example will get an image file and ask a model (that should probably be in quotes: “model”) to classify it as either a pig or a dog. When the model’s confidence is below a certain threshold, we’ll send a notification that we need a human to label the image. Then we’ll pause to wait for the label.

Let’s create the fake image classifier:

1import random
2from enum import Enum
3
4# Set the minimum confidence threshold, below which the model
5# prompts for user input.
6Confidence = int
7MIN_CONFIDENCE: Confidence = 90
8
9
10# The possible labels the model or a human can use
11class Animal(Enum):
12    PIG = "pig"
13    DOG = "dog"
14
15
16# A classifier that returns a random label and confidence score
17class GuessingClassifier:
18    def classify(self, image) -> tuple[Animal, Confidence]:
19        return random.choice(list(Animal)), random.randint(0, 100)
20
21
22# Initialize the classifier so it can be used by the flow
23image_classifier = GuessingClassifier()

Nothing too exciting here, but it’s enough to get us started.

We’re going to notify a human that we need their help using a Slack notification block. Here it is:

1from prefect.blocks.notifications import SlackWebhook
2
3slack_block = SlackWebhook.load("help-us-humans")

💡 Slack has special formatting rules for sending links and other HTML. This example uses Slack’s mrkdwn markup language.

Now let’s tie the pieces together with an expanded version of our flow.

1# An image of a pug.
2# DodosD, CC BY-SA 3.0 <https://creativecommons.org/licenses/by-sa/3.0>,
3# via Wikimedia Commons
4PUGLY = "https://upload.wikimedia.org/wikipedia/commons/d/d7/Sad-pug.jpg"
5
6
7@task
8async def classify(image) -> tuple[Animal, Confidence]:
9    return image_classifier.classify(image)
10
11
12@task
13async def load_image(url: str) -> bytes:
14    return b"fake image bytes"
15
16
17MESSAGE = "Help us, humans! Please view <{image_url}|this image> and classify it."
18
19
20@flow
21async def classify_image(image_url: str = PUGLY):
22    logger = get_run_logger()
23    image = await load_image(image_url)
24    label, confidence = await classify(image)
25
26    if confidence < MIN_CONFIDENCE:
27        message = MESSAGE.format(image_url=image_url)
28        flow_run = get_run_context().flow_run
29
30        if flow_run and settings.PREFECT_UI_URL:
31            flow_run_url = (
32                f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
33            )
34            message += (
35                f"\n\nAfter you view the image, open the <{flow_run_url}|paused flow run>"
36                "and click Resume to classify the image."
37            )
38
39        await slack_block.notify(message)
40
41        label = await pause_flow_run(Animal)
42
43        if image_label.label == label:
44            logger.info("The model was right!")
45        else:
46            logger.info("The model was wrong!")
47    else:
48        logger.info(
49            f"The model was {confidence}% confident that the image was of a {label.value}!"
50        )
51
52
53if __name__ == "__main__":
54    asyncio.run(classify_image.serve(name="guessing-classifier"))

This flow has a single task named classify that takes an image and returns a label and a confidence score. If the confidence score is below a certain threshold, we pause the flow and ask the user to provide the correct label.

Clicking the link to open the paused flow run and then clicking Resume will display a form asking for the label input.

Once the user gives us a label, the flow resumes execution, now with the input the user submitted. In this case, the return value of pause_flow_run is an Animal.

💡 The example image we used was, in fact, pretty hard to classify. If you’re brave enough, test your own powers of classification—is this a dog or a pig?

Try it out!

Interactive workflows are an exciting addition to Prefect, and your feedback is invaluable as we refine and enhance this feature. Interactivity opens up tons of new use cases for flows, from simple approval workflows to building LLM-powered chatbots. So fire up your text editor and see where your imagination takes you!


Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.