Practice Makes Prefect

Prefect's beautiful API is familiar the first time you see it.
No new languages, no config files, and minimal boilerplate.

Prefect's core objects are Tasks, which perform actions, and Flows, which organize tasks. In this tutorial, we step through a few examples to show how Prefect can use simple building blocks to build complex data workflows.

Upgrade from Airflow

Breaking up is hard to do, so Prefect can run Airflow for you.

It only takes one line of code to convert your Airflow DAGs to Prefect Flows. Take advantage of Prefect's advanced features without leaving your AirflowDB behind.

We're currently testing Airflow backwards-compatibility with select partners - get in touch if you're interested!

Email us
Thank you!
Oops! Something went wrong.

HELLOWORLD

In a simple sense, Prefect tasks are just functions. The @task decorator makes it easy to create new ones:
from prefect import Flow, task

@task
def say_hello():
    print("Hello, world!")
To properly run this task, we need to add it to a flow. Let's use Prefect's functional API to open a Flow context and add the task. (Don't worry, there's a fully imperative API too!)
with Flow() as flow:
    say_hello()
Finally, we can test our flow locally by calling flow.run():
flow.run()
# prints: "Hello, world!"

DATAFLOW

Prefect supports parameterized dataflow, which means tasks can process specific outputs of other tasks. This makes it extremely easy to build robust data pipelines.

Let's modify our task to say "Hello!" to an argument called name, and create another task that returns random names:
import random

@task
def say_hello_to(name):
    print(f"Hello, {name}!")

@task
def random_name():
    return random.choice(["Ford", "Arthur"])
When we create our flow, we compose these tasks just as if they were Python functions:
with Flow("Dataflow") as flow:
    name = random_name()
    say_hello_to(name=name)
Running the flow now results in a random greeting (note: your results may vary!):
flow.run()
# prints: "Hello, Arthur!"
flow.run()
# prints: "Hello, Ford!"

FLOWPARAMS

Parameters are special tasks whose value is provided when the flow is run. When combined with dataflow, they can be used to provide inputs or configuration at runtime. Essentially, they turn flows into functions.

Let's use a parameter to make our flow say hello to a specific person. All we have to do is  replace our random_name task with an appropriately-named Parameter:
from prefect import Parameter

with Flow("Parameters") as flow:
    name = Parameter("name")
    say_hello_to(name=name)
Now, we provide a name whenever we run the flow:
flow.run(name="Ford")
# prints: "Hello, Ford!"
flow.run(name="Marvin")
# prints: "Hello, Marvin!"

MAPTASKS

One of Prefect's most useful control flow features is the map operator. When a task is mapped over its inputs, Prefect automatically creates copies of the task in order to process the inputs in parallel. By composing multiple mapped tasks, you can easily create parallel pipelines.

To modify our flow to process a list of names in parallel, we only have to make one minor change: we call the tasks's map() method instead of calling the task itself. (Ok, so there's one other change: we rename the Parameter from "name" to "names," for clarity.)

This flow tells Prefect that say_hello_to will be mapped over the names parameter:
with Flow("Mapping") as flow:
    say_hello_to.map(name=Parameter("names"))
If we provide a list of names when we run the flow, we see the expected result:
flow.run(names=['Marvin', 'Ford', 'Arthur'])
# prints: "Hello, Marvin!"
# prints: "Hello, Ford!"
# prints: "Hello, Arthur!"

HELLOETL

Our tutorial flow is well and good, but Prefect's real "Hello, world!" is a production-ready ETL pipeline in less than 20 lines of code:
from prefect import Flow, task


@task
def extract():
    '''Return a list of data'''
    return [1, 2, 3]


@task
def transform(x):
    '''Multiply each item by 10'''
    return x * 10 


@task
def load(y):
    '''Print the result'''
    print("Received y: {}".format(y))


with Flow("ETL") as flow:
    e = extract()
    t = transform.map(e)
    l = load(t)


flow.run()
# prints: "Received y: [10, 20, 30]"

ANDMORE

We've barely scratched the surface, and we haven't even mentioned Prefect Cloud.

Stay tuned.

MADE IN DC