The Prefect Task Library
The Prefect community is a welcoming place for developers of all experience levels, and our task library is an excellent way to get involved with contributing to an open-source project.
October 8, 2020
CreateNamespacedJobor invoking an AWS lambda function with
- Gain experience contributing to an open source project
- Increase adoption for libraries, tools, and frameworks by making an easy route for users of Prefect to interact with them
- Allow for tasks to evolve with Prefect meaning that as paradigms and abstractions change in Prefect the task in the open source library will change with it
- Open up collaboration to thousands of other developers who could use your task (they might fix bugs in the task you weren’t aware of!)
- Define: this is the first and most important step in a task’s lifecycle — defining what it does! This is also the step that is most critical to contributors of a new task, and the one we will focus on for the rest of this post.
- Initialize: Users of your task will first need to initialize, or instantiate, the task definition into a
Taskinstance. This is a common place to specify static configuration of your task — things like the task name, default values, etc. Note that all information provided at initialization must be known prior to running your flow.
- Bind:Prefect tasks are most interesting when considered in relation to other tasks — these relationships are managed and tracked by a Prefect flow. There are two ways to bind a task to a flow: by “calling” the task (see examples below), or by using Prefect’s imperative API and explicitly adding the task to your flow object with its associated dependencies.
- Run: The goal of all of this is ultimately to run the task within the context of a flow. This is always handled for you when you call
flow.runor a flow run is triggered via a Prefect backend, taking all triggers and state handler logic into account.
@taskdecorator handles steps 1 and 2 simultaneously: the function you decorate defines your task’s runtime logic, and all keywords passed into the decorator are used when initializing the task!
from prefect import task, Flow from prefect.tasks.shell import ShellTaskls_task = ShellTask(command="ls", return_all=True)@task def show_output(std_out): print(std_out)with Flow("list_files") as flow: ls = ls_task() show_output(ls)
from prefect import task, Flow from prefect.tasks.shell import ShellTask# Will only return the listed files ls_task = ShellTask(command="ls", return_all=True)@task def show_output(std_out): print(std_out)with Flow("count_files") as flow: ls = ls_task() show_output(ls)# Override command to count listed files ls_count = ls_task(command="ls | wc -l") show_output(ls_count)
class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)my_task = MyTask(val=42)with Flow("task-with-default") as flow: t1 = my_task() t2 = my_task(val=100)
MyTaskis initialized before the definition of the flow and within the flow context that task is copied twice to create two tasks. The first task uses the default value of
42and the second task overrides the value to set
100. This pattern was chosen in order to avoid having to re-initialize the task every time it is needed in the flow. The snippet above is effectively equivalent to the following:
class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)with Flow("task-with-default") as flow: t1 = MyTask(val=42)() t2 = MyTask()(val=100)
MyTaskis instantiated and called two times inside the definition of the flow. The first set of parenthesis are used for initializing the task and the second are for actually passing run information to the task. Sometimes users will attempt to pass values from upstream tasks to a downstream’s initialization function instead of the call to run. That is not possible because the results from upstream tasks are not returned until the task actually runs, therefore it needs to be passed to the call to run:
@task def get_value_1(): return 100@task def get_value_2(): return 200class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)my_task = MyTask()with Flow("task-passing") as flow: t1 = my_task(get_value_1()) t2 = my_task(get_value_2()) # is equivalent towith Flow("task-passing") as flow: t1 = MyTask()(get_value_1()) t2 = MyTask()(get_value_2())
runfunctions. If you recall from above this separation is what allows tasks to be “templated” with configuration and reused within the context of the flow. The
__init__of the task will be called before the flow runs and the
runfunction is where any of your task's logic will live to be executed at runtime. Allowing for kwargs to be set both during initialization and at runtime is key to improving a task's functionality.
ShellTaskwith a specific
shelland then passing in a different
commandat runtime. This will create two tasks in the flow, each with different commands, without having to redefine the shell type.
my_shell = ShellTask(shell="bash")with Flow("shell_commands") as flow: task1 = my_shell(command="ls") task2 = my_shell(command="ls | wc -l")
from prefect import Task from prefect.utilities.tasks import defaults_from_attrsclass YourTask(Task): """ Task for doing something Args: - your_kwarg (string, optional): an optional kwarg - **kwargs: additional keyword arguments to pass to the Task constructor """def __init__(self, your_kwarg: str = None, **kwargs: Any): self.your_kwarg = your_kwarg super().__init__(**kwargs)@defaults_from_attrs("your_kwarg") def run(self, your_kwarg: str = None) -> str: """ Run your task Args: - your_kwarg (string, optional): an optional kwarg Returns: - string: description of what it returned """# Place your specific task run logic inside this function return use_your_library(your_kwarg)
defaults_from_attrs. This decorator serves the purpose of reducing the amount of boilerplate code in the task. If a value is set at initialization of the task and is not set again at runtime then the value set at initialization will be used in place of the absent runtime value. However, values set at runtime will always override those set during initialization.
Taskclass which means it has access to important Prefect task attributes like the configured Prefect logger. This means that since your task’s
runfunction you can automatically begin logging information by accessing the logger off of
- EnvVarSecret: retrieves secret values that were set via environment variables
- AWSSecretsManager: will pull secrets from an AWS Secrets Manager
class YourTask(Task): def __init__(self, your_kwarg: str = None, **kwargs: Any): ...# Allows init kwargs to be passed to the run function if they are not overridden @defaults_from_attrs("your_kwarg") def run(self, your_kwarg: str = None, your_secret: str = None) -> str: authenticate_with_your_service(your_secret) ...
from prefect import Flow from prefect.tasks.secrets import PrefectSecret from prefect.tasks.your_framework import YourTaskyour_task = YourTask(your_kwarg="init kwarg")with Flow("your-flow") as flow: your_secret = PrefectSecret("your_secret_name") your_task(your_secret=your_secret)
runfunction must be documented in the docstring. Check out any of the other tasks in the task library as a point of reference!
outline.tomlfile in the docs directory:
[pages.tasks.your_task] title = "Your Task" module = "prefect.tasks.your_task" classes = ["YourTask"]
- join our Slack community for ad-hoc questions
- follow us on Twitter for updates
- attend our meetup events for contributors, focused on the internals of Prefect
- visit us on Github to open issues and pull requests