The Prefect Task Library
The Prefect community is a welcoming place for developers of all experience levels, and our task library is an excellent way to get involved with contributing to an open-source project.
Josh Meek | Data Engineer
November 8, 2020
The Prefect task library is a constantly growing collection of pre-defined tasks that provide off-the-shelf functionality for working with a wide range of tools, ranging from shell script execution to Kubernetes Job management to sending tweets. A majority of the task library is community supported and thus opens the door for users who want to contribute new tasks or expand the functionality of existing tasks. Tasks in the task library are typically created with a specific goal in mind such as creating a Kubernetes Job with
CreateNamespacedJobor invoking an AWS lambda function with
Above is a table showcasing some of the tasks that have been contributed to the task library for interfacing with various tools and services that users have deemed useful. For a full list of tasks in the library and more information on how to use them visit the API reference documentation for the
Committing to Prefect’s task library is a safe place for new users to learn the ins and outs contributing to an open source project as well as a great way to assist in open source development! Developers from all skill levels are accepted in contributing to Prefect’s task library and we are more than happy to guide users through the process. The Prefect community is designed for collaboration opportunities for developers to discuss, implement, and maintain the growing list of Prefect integrations. Whether it’s in the Prefect Community Slack or directly on the GitHub repo, all community discussions happen in the open, visible to all.
There are a few key reasons why users contribute tasks to the task library:
- Gain experience contributing to an open source project
- Increase adoption for libraries, tools, and frameworks by making an easy route for users of Prefect to interact with them
- Allow for tasks to evolve with Prefect meaning that as paradigms and abstractions change in Prefect the task in the open source library will change with it
- Open up collaboration to thousands of other developers who could use your task (they might fix bugs in the task you weren’t aware of!)
Not to mention that we occasionally send Prefect swag to some of our open source contributors!
Task Library in Action
Just like any other Prefect task, tasks in the task library can be used by importing, initializing and adding them to your flow. The lifecycle of a task can be confusing for users who are not used to deferred computation, so for the sake of clarity let’s review the steps involved in getting a task into a Prefect flow and running it:
- Define: this is the first and most important step in a task’s lifecycle — defining what it does! This is also the step that is most critical to contributors of a new task, and the one we will focus on for the rest of this post.
- Initialize: Users of your task will first need to initialize, or instantiate, the task definition into a
Taskinstance. This is a common place to specify static configuration of your task — things like the task name, default values, etc. Note that all information provided at initialization must be known prior to running your flow.
- Bind:Prefect tasks are most interesting when considered in relation to other tasks — these relationships are managed and tracked by a Prefect flow. There are two ways to bind a task to a flow: by “calling” the task (see examples below), or by using Prefect’s imperative API and explicitly adding the task to your flow object with its associated dependencies.
- Run: The goal of all of this is ultimately to run the task within the context of a flow. This is always handled for you when you call
flow.runor a flow run is triggered via a Prefect backend, taking all triggers and state handler logic into account.
@taskdecorator handles steps 1 and 2 simultaneously: the function you decorate defines your task’s runtime logic, and all keywords passed into the decorator are used when initializing the task!
from prefect import task, Flow from prefect.tasks.shell import ShellTaskls_task = ShellTask(command="ls", return_all=True)@task def show_output(std_out): print(std_out)with Flow("list_files") as flow: ls = ls_task() show_output(ls)
Most keyword arguments for tasks imported from the task library can either be set at initialization for reuse purposes or optionally set and overwritten when defining the flow.
from prefect import task, Flow from prefect.tasks.shell import ShellTask# Will only return the listed files ls_task = ShellTask(command="ls", return_all=True)@task def show_output(std_out): print(std_out)with Flow("count_files") as flow: ls = ls_task() show_output(ls)# Override command to count listed files ls_count = ls_task(command="ls | wc -l") show_output(ls_count)
Tasks in Prefect take a subclass approach that allows users to provide a configurable task “template” meaning that default values can be both set at initialization and optionally overwritten at runtime. Take the following task as an example:
class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)my_task = MyTask(val=42)with Flow("task-with-default") as flow: t1 = my_task() t2 = my_task(val=100)
An instance of
MyTaskis initialized before the definition of the flow and within the flow context that task is copied twice to create two tasks. The first task uses the default value of
42and the second task overrides the value to set
100. This pattern was chosen in order to avoid having to re-initialize the task every time it is needed in the flow. The snippet above is effectively equivalent to the following:
class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)with Flow("task-with-default") as flow: t1 = MyTask(val=42)() t2 = MyTask()(val=100)
Notice above that
MyTaskis instantiated and called two times inside the definition of the flow. The first set of parenthesis are used for initializing the task and the second are for actually passing run information to the task. Sometimes users will attempt to pass values from upstream tasks to a downstream’s initialization function instead of the call to run. That is not possible because the results from upstream tasks are not returned until the task actually runs, therefore it needs to be passed to the call to run:
@task def get_value_1(): return 100@task def get_value_2(): return 200class MyTask(Task): def __init__(self, val = None, **kwargs): self.val = val super().__init__(**kwargs)def run(self, val = None): print(self.val or val)my_task = MyTask()with Flow("task-passing") as flow: t1 = my_task(get_value_1()) t2 = my_task(get_value_2()) # is equivalent towith Flow("task-passing") as flow: t1 = MyTask()(get_value_1()) t2 = MyTask()(get_value_2())
Contributing to the Task Library
We are here to help assist developers contribute to the task library! Below we run through a general guide to contributing to the task library but do not hesitate to reach out to other developers in the community, either on GitHub or the Prefect Community Slack, for help!
In order to build a task for the task library you need to define the task’s
runfunctions. If you recall from above this separation is what allows tasks to be “templated” with configuration and reused within the context of the flow. The
__init__of the task will be called before the flow runs and the
runfunction is where any of your task's logic will live to be executed at runtime. Allowing for kwargs to be set both during initialization and at runtime is key to improving a task's functionality.
One example of this separation in action would be initializing a
ShellTaskwith a specific
shelland then passing in a different
commandat runtime. This will create two tasks in the flow, each with different commands, without having to redefine the shell type.
my_shell = ShellTask(shell="bash")with Flow("shell_commands") as flow: task1 = my_shell(command="ls") task2 = my_shell(command="ls | wc -l")
(For more in depth information on the components of Prefect tasks take a look at The Anatomy of a Prefect Task guide.)
This snippet below is the general structure of a task contained in the task library.
from prefect import Task from prefect.utilities.tasks import defaults_from_attrsclass YourTask(Task): """ Task for doing something Args: - your_kwarg (string, optional): an optional kwarg - **kwargs: additional keyword arguments to pass to the Task constructor """def __init__(self, your_kwarg: str = None, **kwargs: Any): self.your_kwarg = your_kwarg super().__init__(**kwargs)@defaults_from_attrs("your_kwarg") def run(self, your_kwarg: str = None) -> str: """ Run your task Args: - your_kwarg (string, optional): an optional kwarg Returns: - string: description of what it returned """# Place your specific task run logic inside this function return use_your_library(your_kwarg)
In the snippet above there is a special decorator
defaults_from_attrs. This decorator serves the purpose of reducing the amount of boilerplate code in the task. If a value is set at initialization of the task and is not set again at runtime then the value set at initialization will be used in place of the absent runtime value. However, values set at runtime will always override those set during initialization.
An important thing to note is the task you are adding to the task library subclasses the base
Taskclass which means it has access to important Prefect task attributes like the configured Prefect logger. This means that since your task’s
runfunction you can automatically begin logging information by accessing the logger off of
For more examples of how the other tasks in the task library look check out the directory containing all of the task library code.
For more information on contributing to the Prefect library as whole check out the development documentation.
Secrets and Authentication
It is common for tasks in the task library to require some sort of authentication when interacting with services. Prefect has a recommended implementation when it comes to using credentials in a task and that is through the use of a Secret task.
Just like other tasks in the task library Prefect Secret tasks can be community contributed and maintained for interacting with secrets:
- EnvVarSecret: retrieves secret values that were set via environment variables
- AWSSecretsManager: will pull secrets from an AWS Secrets Manager
We welcome and encourage contributions of other Secret tasks that can securely retrieve sensitive information from your desired secret store!
Secret tasks are only able to retrieve secret data during runtime therefore it is required that your secret values be passed into your tasks through the
class YourTask(Task): def __init__(self, your_kwarg: str = None, **kwargs: Any): ...# Allows init kwargs to be passed to the run function if they are not overridden @defaults_from_attrs("your_kwarg") def run(self, your_kwarg: str = None, your_secret: str = None) -> str: authenticate_with_your_service(your_secret) ...
This allows users of the task to use Prefect Secrets to securely pass sensitive information to the task using whatever secret storage mechanism they prefer:
from prefect import Flow from prefect.tasks.secrets import PrefectSecret from prefect.tasks.your_framework import YourTaskyour_task = YourTask(your_kwarg="init kwarg")with Flow("your-flow") as flow: your_secret = PrefectSecret("your_secret_name") your_task(your_secret=your_secret)
Due to the nature of the tasks in the task library interacting with a wide range of services from various contributors it is not always possible for Prefect to maintain tests that communicate with all of these services. Because of this, it is integral that users who are contributing tasks to the task library test their tasks themselves against whichever services the tasks interact with.
However, we do still encourage the contribution of unit tests! The unit tests for tasks in the task library generally test that proper variables are set and used with accompanying mocks for the services that they interact with. For examples of how some of the other tasks in the task library are tested check out the tasks testing directory.
Documentation is critical to helping new users understand how tasks are intended to be used, and what is possible (and not possible) with them.
Tasks in the task library follow Prefect’s standard documentation practices as outlined in the development page on Documentation. This means that kwargs in the task’s
runfunction must be documented in the docstring. Check out any of the other tasks in the task library as a point of reference!
In order for new tasks to appear in the API documentation they need to be added to the
outline.tomlfile in the docs directory:
[pages.tasks.your_task] title = "Your Task" module = "prefect.tasks.your_task" classes = ["YourTask"]
If you are interested in contributing to the task library but don’t know where to start, please send us your questions and feedback — we appreciate the opportunity to work with all of you!
- join our Slack community for ad-hoc questions
- follow us on Twitter for updates
- attend our meetup events for contributors, focused on the internals of Prefect
- visit us on Github to open issues and pull requests
— The Prefect Team