Getting Started with Prefect 2.0
SHARE
Jun. 6, 2022

The foundation for using orchestration can be found in this all-encompassing getting started guide.

Peyton Runyan
Peyton RunyanSoftware Engineer

Workflow orchestration frameworks are primarily used to coordinate, monitor and observe the movement of data in production applications.

Such frameworks typically include a family of independent features that collectively make modern data pipelines fault-tolerant and robust.

These features include:

  • scheduling and triggering jobs

  • retries

  • dependency and state dependencies

  • caching expensive tasks

  • deploying runs to different environments

  • fine-grained visibility into the execution state of all tasks in your data workflows

These allow us to gracefully handle failure events, including scenarios beyond our control such as cloud outages or API failures. Without explicitly tracking states in data pipelines, they become prone to triggering premature jobs, re-running already completed work, or even failing haphazardly.

The features workflow orchestration provides are not limited to supporting the scheduled movement of data from a source to a destination.

These features are also heavily applied in other domains such as machine learning and parameterized report generation. Presently, workflow orchestration is getting simple enough for hobbyists to adopt for personal projects.

Below we're going to demonstrate a very basic example of what happens when you don't take orchestration into account. The following code is designed to be run from a Jupyter notebook. If you would like to run it from a .py file, you will need to add the following block to the end of the file:

If you've ever written code before, the problems caused by the code below will not be new to you, but it is worth making explicit.

The code mimics a simple data pipeline, which makes a call to an API service, augments the data, and then writes the results to our database.

The major difference is that the API call that we are making will fail half of the time. This is hopefully much more frequently than your API calls will fail in production, but it is useful for demonstration purposes.

Try running the pipeline a few times.

This is obviously a trivial example, and as engineers, we know to expect these things and deal with them. But, dealing with ways code fails is NOT what we set out to do. We set out to write a data pipeline.

The process of writing code that deals with failures, instead of writing code that performs the actions that we want to be done, is something that we at Prefect refer to as Negative Engineering.

Negative Engineering happens when engineers write defensive code to make sure the positive code actually runs. It must anticipate the almost limitless number of ways that code can fail, and is a massive time sink.

Prefect aims to eliminate as much negative engineering as possible for you.

It's easier to show than it is to tell, so let's run this next block and then we'll explain what is happening.

First, make sure that Prefect 2.0 is installed. We will be using version 2.0b3 in this tutorial.

You can run pip install -U "prefect>=2.0b3" in your command line, or you can run !pip install -U "prefect>=2.0b3" in a Python cell in your Jupyter notebook.

To create a flow, we simply import flow from prefect and then add it as a decorator to our pipeline function. You can see the modifications that we’ve made to our flow below. Any lines that have modifications will be tagged with the comment # NEW **** .

Now let's try running our pipeline again a few times.

Our flow still sometimes succeeds and sometimes fails, but something interesting happened: we now have logs, and we see that the error is caught instead of halting execution.

These logs aren't just printed. These are actually persisted in a database without any effort on our part. This gives us visibility into how our code fails or succeeds.

By default, the database is a SQLite database that lives at ~/.prefect.

If you execute ls ~/.prefect in your terminal, or !ls ~/.prefect inside of your notebook, you should see orion.db listed. This is our SQLite database. You may also see other files. Ignore them for now.

I like to use an application called DB Browser for SQLite to examine my SQLite databases. You can download the browser here.

We can see the tables that Prefect created in our database below. The table that we care about right now is log.

Prefect 101 - Databases 1

If we look at the data in our log table, we can see that our logs were, in fact, saved!

Prefect 101: Databases 2

Flows are only the first step in orchestrating our data pipelines. The next step is adding Prefect task.

A task can be thought of as a discrete unit of work. In practice, you'll often simply convert the functions that make up your flow into tasks.

Like flows, tasks are created by adding a decorator. We'll demonstrate below.

Let's run our pipeline again!

Again, we're still getting errors, but our logs are clearer, and we have better visibility into our data pipeline.

The first feature of tasks that we're going to show is names. Names allow you to add descriptive titles to your tasks, independent of the underlying function's name.

We add names by passing the name parameter to the task decorator. (Note: you can also add names to flows using the name parameter)

The next feature that we will demo is the ability to retry a task. We know that tasks will inevitably fail. Sometimes this requires complex behavior, but other times we simply need to try again after a brief delay. We can do this with the retries and retry_delay_seconds parameters.

This will be helpful for our unreliable API call.

Let's update our code.

Now let’s give this pipeline a try.

If you run it a few times, you can see that we still encountered an exception and that the exception was logged. But this time we were probably able to overcome it by retrying, allowing the Flow to complete in the manner we intended.

So far we have covered trivial examples inside of a Jupyter Notebook. To demonstrate the power of Prefect, we'll need to begin moving outside of the notebook environment.

We'll stick with the trivial example for now so that we can focus on concepts.

You should move your code from the last example to a file named trivial-flow.py. There’s no need to add an if **name** == "__main__" block for this file. Deployments, which we’re about to introduce, will handle this for us.

To perform any of the complex orchestration actions that are coming up, we'll need to create a deployment. These sound a lot more complex than they are. A deployment just contains a reference to your flow code and any additional metadata needed to create flow runs.

To create a deployment you need two things:

  • A flow

  • A deployment specification

We already have a flow, so let's create a deployment specification. The code will be provided below, but it needs to live in a .py file. Paste this code in a new file named trivial-spec.py at the same level as trivial-flow.py.

Our flows tell us what we want to do, our deployment specifications tell us how we're going to do it.

Specifically, we use the DeploymentSpec class to handle specifying the details of how our flow is going to run.

  • name is the name that we are giving our deployment.

  • flow_location is where the file containing the code that we want to run is located. We can use a relative file path for this.

  • flow_name is the name that we provided in our flow decorator.

  • parameters is the parameters that we want to pass to our flow. If we look back at our flow code, we can see that the function signature is pipeline(msg: str). So we pass a dictionary to our Deployment Spec {'msg':'Hello from my first deployment!'} where the key 'msg' corresponds to our flow's parameter msg and the value Hello from my first deployment is the value that we will pass to the flow.

  • tags are used to organize deployments. Maybe you have machine learning flows and ETL flows, and you want to quickly be able to separate them. You can tag your machine learning flows with "ml" and your ETL flows with "ETL". Tags are also used for work queues, which we will explain soon.

  • flow_runner specifies the way that you want this flow run. We're going to handwave this a bit for now so that it's not too distracting. The important thing to know for now is that SubprocessFlowRunner is used to run a flow on your local machine.

Most fun things happen in Prefect using the prefect command-line tool. The general syntax is prefect <thing-you-are-dealing-with> <command>.

If at any point you forget the command, try passing --help and you will receive helpful documentation. This works at all levels of command granularity. For example, prefect --help, prefect work-queue --help, and prefect work-queue create --help will all offer you documentation for each of the associated CLI commands.

In our case, we're dealing with deployments, and we want to create one using the deployment specification named trivial-deployment.py.

We can accomplish this by running prefect deployment create "trivial-deployment.py".

When you create a deployment, Prefect saves your code in your storage, and notes its location. Prefect also saves the information from your DeploymentSpec in its database. This distinction — that your code goes in YOUR storage — is incredibly important. Prefect is able to orchestrate the execution of your code without ever seeing it, allowing for greater security and privacy.

If you navigate to ~/.prefect, you should see both the database and the directory labeled storage.

If we look in the storage directory, we'll see a file with a name that is a mix of letters and numbers.

This is the file that contains our flow code. If we use the cat command to examine it, we'll see a copy of our code in the state that it was in when we created our deployment.

Because we've made a copy, if we modify our code in our trivial-flow.py file, it will not change the code that the deployment runs. This keeps our deployments stable while we continue to develop our flows.

Our code is saved to storage, but information about deployment is saved to our database. Using the database explorer, we can examine the deployment table, where we will see the deployment that we just created.

Prefect 101 - Deployment Database TRable

You can see that all of our information is there -- the name, the tags, the parameters, and so on.

We now have flows and deployments. It's time to introduce the Prefect UI. This will allow us to visually inspect our flows and deployments, schedule them, and handle a variety of other important tasks. Right now it will have limited functionality, but in only a few more steps, we'll have access to its full capabilities.

To access the UI, type the command prefect orion start in your terminal. This will launch a long-running Prefect application. You should see Check out the dashboard at http://127.0.0.1:4200. If you scroll up in your terminal a bit, your output should look similar to the picture below:

Click on the link or paste the URL into your web browser to see the Prefect UI. Make sure to NOT include /api in the URL that you paste into your browser otherwise you will get an error. You should now be able to see the UI, including red and green bars showing failed and successful runs and our flows. The UI is likely to change as Prefect continues to evolve. If yours looks different, don’t panic.

Take a moment and explore!

UI Gif

The UI is able to show us the information that Prefect has been saving to our database. If you click around, you will also find the logs for individual flows and tasks. Take a moment and explore.

Work queues are one of the last two pieces that we need to begin full orchestration of our flows.

Remember tags from our DeploymentSpec? The examples that we used were ml for machine learning and ETL for, well, ETLs.

Work queues are ways for us to organize flows that we want to run, based on things such as tags. You can think of a work-queue as a fancy filter if you'd like.

To create a work queue we use the command prefect work-queue create -t <tag> <queue-name> where <queue-name>is the name that you want to give the work queue. When you run the command, the console output will be the work queue's unique ID.

Open a new console (because your other console is running the Prefect API server) and run the command prefect work-queue create -t "ETL" etl-queue. Your output should look similar to the image below:

work-queue1

If we check the work_queue table of our database we will see etl-queue there.

Prefect 101 - Work Queues

If you ever forget which work queues you have available, you can run the command prefect work-queue ls to view them.

Return to the Prefect UI, click on the Flows tab, and then click on the flow Previously unreliable pipeline. This will open the sidebar. Once the sidebar is open, look for the Deployments heading. To the right, you will see the Quick Run button. Go ahead and click it. You should briefly see a pop-up that says "Flow Run Scheduled".

Your run history should now have a new addition: a yellow bar. If you hover over it, you will see that it says Late flow runs. This is because we clicked Quick Run, which schedules a flow to be run immediately, but the flow has not started running.

late-flow

We can check the flows in our work queue by running the command prefect work-queue preview <work-queue's unique ID> . If you run this command, you’ll see that we definitely have a flow lined up in our work queue, but it is late.

What's wrong now?

We mentioned elsewhere that your code stays with your infrastructure. This means that the Prefect application never has direct access to it, so it can't run your code for you. All it can do is say which code needs to be run, and pass information that you provided in your DeploymentSpec. This is the hybrid execution model. You supply the code, we supply the orchestration.

In order to start a flow on your computer from the application, we need a Prefect agent.

An agent is a long-running process that lives on your compute infrastructure. When we create the agent, we provide it a work queue to monitor. It then queries the Prefect application every five seconds, checking if there is code that it should run. When a flow is added to the work queue that the agent is responsible for, the API sends the agent the flow information. The agent then kicks off the flow run.

You'll want to open up a new terminal window because the agent is a long-running process. Then run the magic command prefect agent start <work-queue-id>. If you've forgotten your work queue’s unique ID, you can check with prefect work-queue ls.

Your agent will immediately begin executing the flow that is in your etl-queue. If you check your work queue again using prefect work-queue preview <work-queue-id> you will see that it is empty.

If you check the Flows heading, you will now see that the bar that was previously yellow is now green. If you look under the Flow Runs heading, you will see that the dot that was yellow is also green.

completed-flow

Now if you click Quick Run again, the flow will immediately execute.

Congratulations! You now have all of the basic components needed to orchestrate flows with Prefect. There are still many areas to explore, including sub-flows, settings up permanent storage, flow runners, configs and profiles, and so much more!

A great next step is to check out our Concepts Documentation or Getting Started with Prefect Cloud Tutorial.

Good luck, and feel free to join our Slack Community if you have any questions or cool projects that you would like to show off!

Happy Engineering!

Posted on Jun 6, 2022
Blog Post
Docker
GitHub
Debugging
Caching
Dynamic DAGs
Error Handling
Logging
Mapping
Notifications
Retries
Scheduling
Integrations
Monitoring
DevOps & CI/CD
Other

Love your workflows again

Orchestrate your stack to gain confidence in your data