Prefect Product

Beyond Scheduling

Event-Driven Flows with Prefect

July 09, 2023
Will Raphaelson
Principal Product Manager
Share

Forget about Time

Time is, generally speaking, weird. Its the least-worst way to describe when something has, is, or will occur, but its kind of a messy abstraction. In the realm of data pipelines, it’s an abstraction, or a guess, of when something else has happened that demands a pipeline be executed.

Running a refresh pipeline every hour represents a guess that in the span of an hour, new data thats worth moving or displaying elsewhere was produced. Scheduling a pipeline to run weekly is similarly a guess that the supply of or demand for data changes no faster than once per week.

But modern data practitioners need to take action on data as it is produced - emailing users as soon as they sign up, updating fulfillment statuses as soon as an order is placed, and displaying sensor data as soon as it makes a reading.

Event-driven architectures allow us to stop guessing - rather than guessing that some customer data came in in the past hour, we should just run the pipeline when new customer data comes in, immediately, with no extra steps and no educated guesses.

Time is, of course, useful - pipeline runs almost always cost money, and you might not want to spend money every time an event occurs - but it shouldn’t be the only way to say when something should run. Prefect makes event-driven data pipelines simple.

Event-Driven Flows with Kafka, Event Webhooks and Deployment Triggers

Imagine you have a web app, and luckily, people are signing up! You know that your onboarding email series is most effective if you send it immediately after signup.

In this post, I’ll wire up a system that subscribes to a Kafka topic with my new user signups, pipes those events into Prefect Cloud, and triggers the onboarding flow on receipt of those events. It assumes basic familiarity with Prefect Cloud. If you’re not quite there yet, check out our docs or hit me up in the Slack Community.

New User Signups to Prefect Events

When a new user signs up for my app, my app produces a message to a Kafka topic, which is an event store commonly used in distributed applications. I have a Kafka topic hosted on Confluent Cloud that contains these messages.

I know that I want to use a Prefect Flow to reach out to those new users as soon as possible after they sign up. To do this, I’m going to need to get these signup messages into Prefect, where I can observe and react to them.

To get messages from my topic into Prefect Cloud, we’re going to take advantage of Prefect Cloud event webhooks. Webhooks are an ultra-lightweight way to expose an endpoint that accepts HTTP traffic, and are used for simple integration across the modern web. When an HTTP call is made to a Prefect webhook, it emits a Prefect event and captures data from the http call for use downstream.

Using Confluent’s Httpsink connector, I can automatically hit a Prefect webhook every time a message appears in the topic.

So if I get a new user signup message in the form of:

1"[{\"registertime\":1494287144756,\"gender\":\"OTHER\",\"regionid\":\"Region_4\",\"userid\":\"User_5\"}]"

Kafka will fire out to Prefect and save the message as a Prefect event in real time. We use a Jinja template in our webhook configuration to do some light transformation - instructing prefect to construct an event name from the user id of the message:

And we immediately see user events start streaming into Prefect Cloud.

New User Events to Deployment Triggers

Now that I have my new user signups streaming into Prefect, it’s time to do something with them. As it happens (😉) I have the following Python code, which sends a welcome email, checks to ensure that the email didn’t bounce, and updates an internal marketing database. I’ve turned this code into a flow and my functions into tasks with Prefect’s decorators and deployed it to Prefect Cloud.

1from prefect import flow, task
2
3@task(log_prints=True)
4def send_welcome_email(name: str="will") -> None:
5    print(f"Sending onboarding campaign to {name}!")
6    ...
7    pass
8
9@task(log_prints=True)
10def check_for_bounce(email_id: int=12345) -> None:
11    print(f"checking for bounce on email {email_id}")
12    ...
13    pass
14
15@task(log_prints=True)
16def update_marketing_tracker(email_id: int=12345):
17    print(f"updating marketing tracker for email {email_id}")
18
19@flow
20def new_user_flow(name: str) -> None:
21    send_welcome_email(name)
22    check_for_bounce()
23    update_marketing_tracker()
24
25if __name__ == "__main__":
26    new_user_flow(name="will")

I need this flow to run when a new user event hits my system, using the user id from the event payload as the input parameter to the flow. There are a number of ways to do this both in code and the UI, but the simplest way is to navigate to an event like the ones you want to use to trigger your flow run, and click automate.

This bootstraps a trigger definition that listens for customer events. Then we can proceed to the next step of the wizard to tell it what to go when the trigger fires.

You’ll notice some interesting syntax in the user_id field. Automations capture information from the event that triggered them, and you use Jinja syntax to pass that information into your flow run at runtime. {{ event.payload.userid }} says to take the userid from the event we captured, and use that as the main parameter value for the flow. On creation, this automation is now linked to the deployment and visible on the deployment details page under “Triggers”.

Putting it all Together

On the creation of this link, flows start kicking off in response to the Kafka new user messages:

And our flows and events tell the story of a new user signing up and getting their welcome email just seconds after their signup:

This is just the tip of the iceberg on event-driven flows with Prefect. Triggers can fire on a lack of observed events, webhooks can use jinja to flexibly transform payloads into Prefect events from CloudEvents or custom calls, and you can specify triggering events directly from deployments without having to create automations. Give event-driven flows a try and let me know what you think in the community Slack!