Prefect Logo
Workflow Orchestration

Change Data Capture Tutorial: Real-Time Event Workflows with Debezium and Prefect

August 13, 2025
Brendan Dalpe
Sales Engineer


The move from batch schedules to real-time, event-driven systems is transforming how organizations work with data. While cron jobs and scheduled processes still have their place, modern businesses increasingly demand instant insights and immediate action.

Unfortunately, legacy systems that were built in an era of web front-ends, middle tiers, and monolithic databases often lack native mechanisms for triggering remote processes. Rewriting them for modern architectures can be expensive, risky, and yield little return on investment.

So how can you modernize without rewriting? Enter Change Data Capture (CDC).

CDC: Modernizing Without Touching Legacy Code

Change Data Capture continuously monitors your database’s transaction logs, emitting change events without modifying your application code. Your legacy app keeps running as-is, while downstream systems can react instantly to new or updated data.

Debezium: Real-Time CDC for Popular Databases

Debezium is an open-source, distributed platform that streams database changes in real time. It supports PostgreSQL, MySQL, and more. A perfect bridge from old systems to modern event pipelines.

Debezium reads transaction logs, converts them into event streams, and delivers them to sinks like Kafka, Pulsar, or even plain HTTP endpoints. No invasive schema changes, no rewrites.

Adding Prefect to Orchestrate Real-Time Workflows

Debezium supports CloudEvents, a standardized way of describing event data. By pairing this with Prefect’s webhooks, you can trigger automated workflows the instant a change occurs.

Here’s the high-level flow:

  1. Debezium detects a database change.
  2. Debezium HTTP Sink sends a CloudEvent to a Prefect webhook.
  3. Prefect Automation instantly triggers the relevant workflow.

This setup turns your database into a live event source without modifying application logic.

Local Setup: PostgreSQL + Debezium + Prefect

To demonstrate this integration, we’ll create a simple local environment using Docker Compose. This setup will simulate a real-world scenario where you have a legacy PostgreSQL database with a customer database and want to capture its data changes in real time using Debezium, then feed those changes directly into Prefect workflows via webhooks.

1services:
2  postgres:
3    image: postgres:17
4    environment:
5      POSTGRES_USER: postgres
6      POSTGRES_PASSWORD: postgres
7      POSTGRES_DB: app
8    command: ["postgres", "-c", "wal_level=logical"]
9    ports:
10      - "5432:5432"
11    volumes:
12      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
13
14debezium-server:
15    image: quay.io/debezium/server:3.2
16    volumes:
17      - ./debezium/config:/debezium/config
18    depends_on:
19      - postgres

Here, PostgreSQL is configured with logical replication enabled via wal_level=logical, which is a prerequisite for CDC. The mounted init.sql script will set up the demo schema and replication roles automatically on startup.

Database Initialization

The init.sql will handle a few requirements for the local CDC test environment:

  1. It defines a customers table to serve as our example data source.
  2. It configures Postgres to provide full row images for update events and grants a replication role (replicator) the minimal privileges needed for Debezium to read from the database’s logical replication stream.
  3. It sets up a publication that only includes INSERT events for the customers table, helping us focus on a specific event type.
1-- create the application table
2CREATE TABLE public.customers (
3  id SERIAL PRIMARY KEY,
4  first_name TEXT NOT NULL,
5  last_name TEXT NOT NULL,
6  email TEXT UNIQUE NOT NULL
7);
8
9-- ensure full row image for updates
10ALTER TABLE public.customers REPLICA IDENTITY FULL;
11
12-- create a replication role
13CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'replicator';
14GRANT CONNECT ON DATABASE postgres TO replicator;
15GRANT USAGE ON SCHEMA public TO replicator;
16GRANT SELECT ON public.customers TO replicator;
17
18-- publish only inserts to the logical replication stream
19CREATE PUBLICATION dbz_demo_events FOR TABLE public.customers WITH (publish = 'insert');

By limiting the publication to only INSERT events, we avoid unnecessary noise in our event stream. This setup mimics how you might selectively capture changes from a busy production database without overloading downstream systems.

Create a Prefect Webhook

To receive and process events emitted by Debezium, you’ll need to configure a webhook in Prefect Cloud.

Note: Webhooks are available exclusively on Prefect’s paid subscription plans.

Press enter or click to view image in full size

Using the{{ body | from_cloud_events(headers) }} CloudEvent template allows us to process the entire payload and convert it into a JSON structure for use in a future Prefect automation.

Debezium Configuration

Our Debezium configuration connects the PostgreSQL source and the Prefect webhook destination. By using the HTTP sink type, we can forward events directly to Prefect benefiting from Debezium’s CDC capabilities.

1# == Sink (HTTP) ==
2debezium.sink.type=http
3debezium.sink.http.url=https://api.prefect.cloud/hooks/<hash>
4
5# == Source (PostgreSQL) ==
6debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
7debezium.source.database.hostname=postgres
8debezium.source.database.port=5432
9debezium.source.database.user=replicator
10debezium.source.database.password=replicator
11debezium.source.database.dbname=app
12debezium.source.plugin.name=pgoutput
13debezium.source.publication.name=dbz_demo_events  # matches CREATE PUBLICATION statement
14debezium.source.publication.autocreate.mode=disabled
15
16# == Format ==
17debezium.format.value=cloudevents

The key detail here is debezium.format.value=cloudevents, which ensures all events are wrapped in the CloudEvents standard format. This makes them easy for Prefect to interpret and reliably process, complete with metadata describing the nature and context of each change event.

Events in Prefect

With our Debezium configuration in place, we can now examine exactly what Prefect receives when a database change is forwarded.

For example, inserting a row:

1INSERT INTO public.customers (first_name, last_name, email) VALUES ('Brendan', 'Dalpe', 'brendan.dalpe@prefect.io')

will produce an event with the type io.debezium.connector.postgresql.DataChangeEvent. We can see this on the Events page in the Prefect UI.

Inside the Raw tab in Prefect’s event inspector, the incoming CloudEvent is fully parsed into JSON, making it easy to inspect the entire payload and confirm the change details.

Here’s an example of the full CloudEvent payload received and parsed by Prefect:

1{
2  "id": "06897a3b-27de-7396-8000-addbd281b59a",
3  "account": "0ff44498-d380-4d7b-bd68-9b52da03823f",
4  "event": "io.debezium.connector.postgresql.DataChangeEvent",
5  "occurred": "2025-08-09T19:38:25.744Z",
6  "payload": {
7    "cloudevents": {
8      "specversion": "1.0",
9      "id": "name:app;lsn:26595192;txId:765;sequence:[null,\"26595192\"]",
10      "dataschema": null,
11      "datacontenttype": "application/json"
12    },
13    "data": {
14      "schema": {
15        "type": "struct",
16        "fields": [
17          {
18            "type": "struct",
19            "fields": [
20              {
21                "type": "int32",
22                "optional": false,
23                "default": 0,
24                "field": "id"
25              },
26              {
27                "type": "string",
28                "optional": false,
29                "field": "first_name"
30              },
31              {
32                "type": "string",
33                "optional": false,
34                "field": "last_name"
35              },
36              {
37                "type": "string",
38                "optional": false,
39                "field": "email"
40              }
41            ],
42            "optional": true,
43            "name": "app.public.customers.Value",
44            "field": "before"
45          },
46          {
47            "type": "struct",
48            "fields": [
49              {
50                "type": "int32",
51                "optional": false,
52                "default": 0,
53                "field": "id"
54              },
55              {
56                "type": "string",
57                "optional": false,
58                "field": "first_name"
59              },
60              {
61                "type": "string",
62                "optional": false,
63                "field": "last_name"
64              },
65              {
66                "type": "string",
67                "optional": false,
68                "field": "email"
69              }
70            ],
71            "optional": true,
72            "name": "app.public.customers.Value",
73            "field": "after"
74          }
75        ],
76        "optional": false,
77        "name": "io.debezium.connector.postgresql.Data"
78      },
79      "payload": {
80        "before": null,
81        "after": {
82          "id": 1,
83          "first_name": "Brendan",
84          "last_name": "Dalpe",
85          "email": "brendan.dalpe@prefect.io"
86        }
87      }
88    }
89  },
90  "received": "2025-08-09T19:38:26.492Z",
91  "related": [
92    {
93      "prefect.resource.id": "/debezium/postgresql/app",
94      "prefect.resource.role": "cloudevent-source"
95    },
96    {
97      "prefect.resource.id": "prefect-cloud.webhook.7b69ad6c-b49b-48b1-a0b0-320de001a8c5",
98      "prefect.resource.name": "debezium",
99      "prefect.resource.role": "webhook"
100    }
101  ],
102  "resource": {
103    "prefect.resource.id": "/debezium/postgresql/app"
104  },
105  "workspace": "4aee2d55-a0c5-444a-b7ca-f6d19fc574f2"
106}

Triggering a Flow in Prefect through Automation

Let’s walk through a minimal example of using this event to trigger a Prefect flow. In this case, we’ll simply print the after section of the change event payload. The relevant portion of the event payload will be passed as a parameter to our workflow. Thanks to log_prints=True, the row data will appear directly in the Prefect UI.

We’ll use the Prefect Python SDK to configure the deployment and automation programmatically:

1from pathlib import Path
2from prefect import flow
3from prefect.events import DeploymentEventTrigger
4
5@flow(log_prints=True)
6def echo(event: object):
7    print(event)
8
9if __name__ == "__main__":
10    echo.from_source(
11        source=Path(__file__).parent,
12        entrypoint="main.py:echo",
13    ).deploy(
14        name="webhook-echo",
15        work_pool_name="local",
16        triggers=[
17            DeploymentEventTrigger(
18                name="webhook-echo-trigger",
19                expect=["io.debezium.connector.postgresql.DataChangeEvent"],
20                parameters={
21                    "event": {
22                        "template": "{{ event.payload.data.payload.after }}",
23                        "__prefect_kind": "jinja",
24                    }
25                },
26            )
27        ]
28    )

When you run this script (python main.py), Prefect will automatically create:

  • A Deployment named webhook-echo for our demonstration.
  • An Automation named webhook-echo-trigger that listens for the DataChangeEvent type.

The __prefect_kind parameter tells Prefect to dynamically inject the event payload into the flow parameters at runtime.

Tip: For local testing, run a worker with:

1prefect worker start --pool <process>

Replace <process> with the name of your work pool.

Automation created by the Python code

Automation configuration details

Testing the End-to-End Workflow

With the webhook in place, inserting another row:

1INSERT INTO public.customers (first_name, last_name, email)
2VALUES ('Marvin', 'Android', 'totallynotparanoid@hitchhikingthegalaxy.com');

will immediately trigger the automation. Within the Runs page of Prefect, you’ll see the new run along with the event data in the logs.

This completes an end-to-end pipeline: a database change flows through Debezium, is sent as a CloudEvent to Prefect via an HTTP webhook, and instantly launches an automated workflow using the actual row data. This was done without modifying the original application code.

Conclusion: Real-Time Without the Rewrite

By combining CDC capabilities with Prefect’s orchestration, you can:

  • Modernize without rewriting
  • Enable instant workflows
  • Reduce workflow orchestration from minutes in batches to real-time in milliseconds

This approach allows you to breathe new life into existing infrastructure while unlocking the power of event-driven automation. Instead of costly, risky rewrites, you can extend the usefulness of your legacy systems, integrate them into modern architectures, and respond to changes in your data as they happen.

Your legacy systems don’t need to be replaced to remain competitive — they simply need to be connected. With Prefect as the automation engine, those connections become a seamless bridge between the past and the future of your data operations.

To learn more about Prefect:

Happy Engineering!