Introducing: The Prefect KV Store
blog

Introducing: The Prefect KV Store

A flexible building block for workflow metadata

Zach Angell | Software Engineer

June 3, 2021

We are delighted to announce an exciting and highly-requested new feature - the KV (”Key-Value”) Store!
The KV Store emerged from a unique internal process at Prefect called Product Office Hours, a weekly discussion where anyone at the company can raise and discuss “blue sky” ideas - the more the idea challenges our long-standing assumptions, the better.
Critically, Product Office Hours allow the team to identify and explore problems, not just solutions. In fact, we occasionally have to remind ourselves that it's OK to let an idea sit with more questions than answers - this allows everyone to explore their own solutions and then receive feedback in future discussions. As Ed Catmull says in Creativity, Inc. "...[ideas] are forged through tens of thousands of decisions, often made by dozens of people."
Over the course of several office hours discussions, we identified a common pattern across use cases: the need to track a small piece of data relevant to workflows.
“Just” a piece of data
You have your data, you have your tasks, you have your workflows, and you’re ready to go. You just need a place to store some information to track between your flows.
No big deal right? It’s “just” a small piece of data, maybe an indicator of the last record processed. Storage is cheap and readily available from your favorite cloud provider.
Simple enough. Let’s use S3. We’ll just have to quickly:
  • Create or decide on an S3 bucket to hold the information
  • Make sure the permissions to the S3 bucket are correct
  • Configure AWS credentials in my execution environment
  • Double check the IAM permissions associated with those credentials are correct
  • Write a helper function to write the file to S3
  • Write a helper function to read the file from S3
  • Test the flow locally
  • Test running the flow on Prefect Cloud
  • .... debug, rinse, repeat
If you’re familiar with AWS and Prefect, this may take five to ten minutes. If you’re not, this may take hours.
Valuable time that could be spent solving actual problems (positive engineering), all lost to “just” store one small piece of data (negative engineering).
KV Store
KV Store is a managed metadata database within Prefect Cloud.
KV Store allows you to persist and share small pieces of metadata in the form of key-value pairs across your team and flows without having to configure external data storage.
Operations on these global key-value pairs can be performed via the Prefect Python library, CLI, GraphQL API, and UI. Moreover for teams with Prefect Cloud RBAC, access to key / value pairs can be restricted on a per-role basis.
KV Store is perfect for global configuration and information sharing across flows.
Customers are already using the feature to
  • Manage a default context for flow runs
  • Track the last record processed by a task
  • Store information about the last execution of a flow run
  • ... and much more!
Let’s make this concrete with a simple illustrative example. In this example, we want to track the last date a flow has been executed and pick up from that date. While this information is typically available via Prefect's GraphQL API, it requires constructing the appropriate query and of course cannot be mutated or customized.
With KV Store, the last execution date is actually "just" a piece of data that can be managed with a few lines of code!
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.backend import set_key_value, get_key_value

LAST_EXECUTED_KEY = 'my-etl-flow-last-executed'

@task
def get_last_execution_date():
    last_executed = get_key_value(LAST_EXECUTED_KEY)
    return datetime.strptime(last_executed, "%Y-%m-%d")

@task
def run_etl(start_date):
    logger = prefect.context.get("logger")
    while start_date <= datetime.today():
        logger.info(f"Running ETL for date {start_date.strftime('%Y-%m-%d')}")
        # do some etl
        start_date += timedelta(days=1)
    return start_date.strftime('%Y-%m-%d')

@task
def set_last_execution_date(date):
    set_key_value(key=LAST_EXECUTED_KEY, value=date)

with Flow('my-etl-flow') as flow:
    last_executed_date = get_last_execution_date()
    final_execution_date = run_etl(last_executed_date)
    set_last_execution_date(final_execution_date)    
We can now easily reference this key value pair from another flow:
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.backend import set_key_value, get_key_value

LAST_EXECUTED_KEY = 'my-etl-flow-last-executed'

@task
def log_etl_info_for_marvin():
    logger = prefect.context.get("logger")
    last_executed = get_key_value(LAST_EXECUTED_KEY)
    logger.info(f"ETL flow last ran {last_executed}")
    logger.info("Hi Marvin!")

with Flow('log-last-executed-and-say-hi-to-marvin') as flow:
    log_etl_info_for_marvin()
KV Store’s flexible design provides a strong foundation for adding future improvements. We’ve considered, for example, using key value pairs to format messages sent via Prefect Cloud Automations, or allowing for certain Automations to be triggered when a key is updated.
Consistent with Prefect’s product philosophy, we will continue to refine our understanding of the negative engineering problems our users face in order to maximize the value of future improvements. We’re excited to keep improving KV Store based on the community’s feedback!

Please continue reaching out to us with your questions and feedback — we appreciate the opportunity to work with all of you!
P.S. - we’re hiring! We are currently seeking enthusiastic new colleagues for a variety of engineering, marketing, operations, product, and sales roles. Open positions are always posted on our website and we have referral incentives for anyone in our community that introduces us to someone extraordinary!
Happy engineering!
— The Prefect Team